1use camel_api::aggregator::{AggregationStrategy, AggregatorConfig, CompletionCondition};
2use camel_api::body::Body;
3use camel_api::body_converter::BodyType;
4use camel_api::circuit_breaker::CircuitBreakerConfig;
5use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
6use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
7use camel_api::load_balancer::LoadBalancerConfig;
8use camel_api::multicast::{MulticastConfig, MulticastStrategy};
9use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
10use camel_api::splitter::SplitterConfig;
11use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
12use camel_api::{
13 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
14 ProcessorFn, Value,
15 runtime::{
16 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
17 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
18 CanonicalWhenSpec,
19 },
20};
21use camel_component::ConcurrencyModel;
22use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
23use camel_processor::{
24 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
25 UnmarshalService, builtin_data_format,
26};
27
28pub trait StepAccumulator: Sized {
34 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
35
36 fn to(mut self, endpoint: impl Into<String>) -> Self {
37 self.steps_mut().push(BuilderStep::To(endpoint.into()));
38 self
39 }
40
41 fn process<F, Fut>(mut self, f: F) -> Self
42 where
43 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
44 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
45 {
46 let svc = ProcessorFn::new(f);
47 self.steps_mut()
48 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
49 self
50 }
51
52 fn process_fn(mut self, processor: BoxProcessor) -> Self {
53 self.steps_mut().push(BuilderStep::Processor(processor));
54 self
55 }
56
57 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
58 let svc = SetHeader::new(IdentityProcessor, key, value);
59 self.steps_mut()
60 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
61 self
62 }
63
64 fn map_body<F>(mut self, mapper: F) -> Self
65 where
66 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
67 {
68 let svc = MapBody::new(IdentityProcessor, mapper);
69 self.steps_mut()
70 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
71 self
72 }
73
74 fn set_body<B>(mut self, body: B) -> Self
75 where
76 B: Into<Body> + Clone + Send + Sync + 'static,
77 {
78 let body: Body = body.into();
79 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
80 self.steps_mut()
81 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
82 self
83 }
84
85 fn transform<B>(self, body: B) -> Self
90 where
91 B: Into<Body> + Clone + Send + Sync + 'static,
92 {
93 self.set_body(body)
94 }
95
96 fn set_body_fn<F>(mut self, expr: F) -> Self
97 where
98 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
99 {
100 let svc = SetBody::new(IdentityProcessor, expr);
101 self.steps_mut()
102 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
103 self
104 }
105
106 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
107 where
108 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
109 {
110 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
111 self.steps_mut()
112 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
113 self
114 }
115
116 fn aggregate(mut self, config: AggregatorConfig) -> Self {
117 self.steps_mut().push(BuilderStep::Aggregate { config });
118 self
119 }
120
121 fn stop(mut self) -> Self {
127 self.steps_mut().push(BuilderStep::Stop);
128 self
129 }
130
131 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
135 self.steps_mut().push(BuilderStep::Log {
136 level,
137 message: message.into(),
138 });
139 self
140 }
141
142 fn convert_body_to(mut self, target: BodyType) -> Self {
154 let svc = ConvertBodyTo::new(IdentityProcessor, target);
155 self.steps_mut()
156 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
157 self
158 }
159
160 fn marshal(mut self, format: impl Into<String>) -> Self {
170 let name = format.into();
171 let df =
172 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
173 let svc = MarshalService::new(IdentityProcessor, df);
174 self.steps_mut()
175 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
176 self
177 }
178
179 fn unmarshal(mut self, format: impl Into<String>) -> Self {
189 let name = format.into();
190 let df =
191 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
192 let svc = UnmarshalService::new(IdentityProcessor, df);
193 self.steps_mut()
194 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
195 self
196 }
197
198 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
209 self.steps_mut().push(BuilderStep::Script {
210 language: language.into(),
211 script: script.into(),
212 });
213 self
214 }
215}
216
217pub struct RouteBuilder {
229 from_uri: String,
230 steps: Vec<BuilderStep>,
231 error_handler: Option<ErrorHandlerConfig>,
232 error_handler_mode: ErrorHandlerMode,
233 circuit_breaker_config: Option<CircuitBreakerConfig>,
234 concurrency: Option<ConcurrencyModel>,
235 route_id: Option<String>,
236 auto_startup: Option<bool>,
237 startup_order: Option<i32>,
238}
239
240#[derive(Default)]
241enum ErrorHandlerMode {
242 #[default]
243 None,
244 ExplicitConfig,
245 Shorthand {
246 dlc_uri: Option<String>,
247 specs: Vec<OnExceptionSpec>,
248 },
249 Mixed,
250}
251
252#[derive(Clone)]
253struct OnExceptionSpec {
254 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
255 retry: Option<RedeliveryPolicy>,
256 handled_by: Option<String>,
257}
258
259impl RouteBuilder {
260 pub fn from(endpoint: &str) -> Self {
262 Self {
263 from_uri: endpoint.to_string(),
264 steps: Vec::new(),
265 error_handler: None,
266 error_handler_mode: ErrorHandlerMode::None,
267 circuit_breaker_config: None,
268 concurrency: None,
269 route_id: None,
270 auto_startup: None,
271 startup_order: None,
272 }
273 }
274
275 pub fn filter<F>(self, predicate: F) -> FilterBuilder
279 where
280 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
281 {
282 FilterBuilder {
283 parent: self,
284 predicate: std::sync::Arc::new(predicate),
285 steps: vec![],
286 }
287 }
288
289 pub fn choice(self) -> ChoiceBuilder {
295 ChoiceBuilder {
296 parent: self,
297 whens: vec![],
298 _otherwise: None,
299 }
300 }
301
302 pub fn wire_tap(mut self, endpoint: &str) -> Self {
306 self.steps.push(BuilderStep::WireTap {
307 uri: endpoint.to_string(),
308 });
309 self
310 }
311
312 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
314 self.error_handler_mode = match self.error_handler_mode {
315 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
316 ErrorHandlerMode::ExplicitConfig
317 }
318 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
319 };
320 self.error_handler = Some(config);
321 self
322 }
323
324 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
326 let uri = uri.into();
327 self.error_handler_mode = match self.error_handler_mode {
328 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
329 dlc_uri: Some(uri),
330 specs: Vec::new(),
331 },
332 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
333 dlc_uri: Some(uri),
334 specs,
335 },
336 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
337 };
338 self
339 }
340
341 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
343 where
344 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
345 {
346 self.error_handler_mode = match self.error_handler_mode {
347 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
348 dlc_uri: None,
349 specs: Vec::new(),
350 },
351 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
352 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
353 };
354
355 OnExceptionBuilder {
356 parent: self,
357 policy: OnExceptionSpec {
358 matches: std::sync::Arc::new(matches),
359 retry: None,
360 handled_by: None,
361 },
362 }
363 }
364
365 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
367 self.circuit_breaker_config = Some(config);
368 self
369 }
370
371 pub fn concurrent(mut self, max: usize) -> Self {
385 let max = if max == 0 { None } else { Some(max) };
386 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
387 self
388 }
389
390 pub fn sequential(mut self) -> Self {
395 self.concurrency = Some(ConcurrencyModel::Sequential);
396 self
397 }
398
399 pub fn route_id(mut self, id: impl Into<String>) -> Self {
403 self.route_id = Some(id.into());
404 self
405 }
406
407 pub fn auto_startup(mut self, auto: bool) -> Self {
411 self.auto_startup = Some(auto);
412 self
413 }
414
415 pub fn startup_order(mut self, order: i32) -> Self {
419 self.startup_order = Some(order);
420 self
421 }
422
423 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
429 SplitBuilder {
430 parent: self,
431 config,
432 steps: Vec::new(),
433 }
434 }
435
436 pub fn multicast(self) -> MulticastBuilder {
442 MulticastBuilder {
443 parent: self,
444 steps: Vec::new(),
445 config: MulticastConfig::new(),
446 }
447 }
448
449 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
456 ThrottleBuilder {
457 parent: self,
458 config: ThrottlerConfig::new(max_requests, period),
459 steps: Vec::new(),
460 }
461 }
462
463 pub fn load_balance(self) -> LoadBalancerBuilder {
469 LoadBalancerBuilder {
470 parent: self,
471 config: LoadBalancerConfig::round_robin(),
472 steps: Vec::new(),
473 }
474 }
475
476 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
492 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
493 }
494
495 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
499 self.steps.push(BuilderStep::DynamicRouter { config });
500 self
501 }
502
503 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
504 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
505 }
506
507 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
508 self.steps.push(BuilderStep::RoutingSlip { config });
509 self
510 }
511
512 pub fn build(self) -> Result<RouteDefinition, CamelError> {
514 if self.from_uri.is_empty() {
515 return Err(CamelError::RouteError(
516 "route must have a 'from' URI".to_string(),
517 ));
518 }
519 let route_id = self.route_id.ok_or_else(|| {
520 CamelError::RouteError(
521 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
522 .to_string(),
523 )
524 })?;
525 let resolved_error_handler = match self.error_handler_mode {
526 ErrorHandlerMode::None => self.error_handler,
527 ErrorHandlerMode::ExplicitConfig => self.error_handler,
528 ErrorHandlerMode::Mixed => {
529 return Err(CamelError::RouteError(
530 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
531 ));
532 }
533 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
534 let mut config = if let Some(uri) = dlc_uri {
535 ErrorHandlerConfig::dead_letter_channel(uri)
536 } else {
537 ErrorHandlerConfig::log_only()
538 };
539
540 for spec in specs {
541 let matcher = spec.matches.clone();
542 let mut builder = config.on_exception(move |e| matcher(e));
543
544 if let Some(retry) = spec.retry {
545 builder = builder.retry(retry.max_attempts).with_backoff(
546 retry.initial_delay,
547 retry.multiplier,
548 retry.max_delay,
549 );
550 if retry.jitter_factor > 0.0 {
551 builder = builder.with_jitter(retry.jitter_factor);
552 }
553 }
554
555 if let Some(uri) = spec.handled_by {
556 builder = builder.handled_by(uri);
557 }
558
559 config = builder.build();
560 }
561
562 Some(config)
563 }
564 };
565
566 let definition = RouteDefinition::new(self.from_uri, self.steps);
567 let definition = if let Some(eh) = resolved_error_handler {
568 definition.with_error_handler(eh)
569 } else {
570 definition
571 };
572 let definition = if let Some(cb) = self.circuit_breaker_config {
573 definition.with_circuit_breaker(cb)
574 } else {
575 definition
576 };
577 let definition = if let Some(concurrency) = self.concurrency {
578 definition.with_concurrency(concurrency)
579 } else {
580 definition
581 };
582 let definition = definition.with_route_id(route_id);
583 let definition = if let Some(auto) = self.auto_startup {
584 definition.with_auto_startup(auto)
585 } else {
586 definition
587 };
588 let definition = if let Some(order) = self.startup_order {
589 definition.with_startup_order(order)
590 } else {
591 definition
592 };
593 Ok(definition)
594 }
595
596 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
598 if self.from_uri.is_empty() {
599 return Err(CamelError::RouteError(
600 "route must have a 'from' URI".to_string(),
601 ));
602 }
603 let route_id = self.route_id.ok_or_else(|| {
604 CamelError::RouteError(
605 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
606 .to_string(),
607 )
608 })?;
609
610 let steps = canonicalize_steps(self.steps)?;
611 let circuit_breaker = self
612 .circuit_breaker_config
613 .map(canonicalize_circuit_breaker);
614
615 let spec = CanonicalRouteSpec {
616 route_id,
617 from: self.from_uri,
618 steps,
619 circuit_breaker,
620 version: camel_api::CANONICAL_CONTRACT_VERSION,
621 };
622 spec.validate_contract()?;
623 Ok(spec)
624 }
625}
626
627pub struct OnExceptionBuilder {
628 parent: RouteBuilder,
629 policy: OnExceptionSpec,
630}
631
632impl OnExceptionBuilder {
633 pub fn retry(mut self, max_attempts: u32) -> Self {
634 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
635 self
636 }
637
638 pub fn with_backoff(
639 mut self,
640 initial: std::time::Duration,
641 multiplier: f64,
642 max: std::time::Duration,
643 ) -> Self {
644 if let Some(ref mut retry) = self.policy.retry {
645 retry.initial_delay = initial;
646 retry.multiplier = multiplier;
647 retry.max_delay = max;
648 }
649 self
650 }
651
652 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
653 if let Some(ref mut retry) = self.policy.retry {
654 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
655 }
656 self
657 }
658
659 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
660 self.policy.handled_by = Some(uri.into());
661 self
662 }
663
664 pub fn end_on_exception(mut self) -> RouteBuilder {
665 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
666 specs.push(self.policy);
667 }
668 self.parent
669 }
670}
671
672fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
673 let mut canonical = Vec::with_capacity(steps.len());
674 for step in steps {
675 canonical.push(canonicalize_step(step)?);
676 }
677 Ok(canonical)
678}
679
680fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
681 match step {
682 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
683 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
684 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
685 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
686 BuilderStep::DeclarativeScript { expression } => {
687 Ok(CanonicalStepSpec::Script { expression })
688 }
689 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
690 predicate,
691 steps: canonicalize_steps(steps)?,
692 }),
693 BuilderStep::DeclarativeChoice { whens, otherwise } => {
694 let mut canonical_whens = Vec::with_capacity(whens.len());
695 for DeclarativeWhenStep { predicate, steps } in whens {
696 canonical_whens.push(CanonicalWhenSpec {
697 predicate,
698 steps: canonicalize_steps(steps)?,
699 });
700 }
701 let otherwise = match otherwise {
702 Some(steps) => Some(canonicalize_steps(steps)?),
703 None => None,
704 };
705 Ok(CanonicalStepSpec::Choice {
706 whens: canonical_whens,
707 otherwise,
708 })
709 }
710 BuilderStep::DeclarativeSplit {
711 expression,
712 aggregation,
713 parallel,
714 parallel_limit,
715 stop_on_exception,
716 steps,
717 } => Ok(CanonicalStepSpec::Split {
718 expression: CanonicalSplitExpressionSpec::Language(expression),
719 aggregation: canonicalize_split_aggregation(aggregation)?,
720 parallel,
721 parallel_limit,
722 stop_on_exception,
723 steps: canonicalize_steps(steps)?,
724 }),
725 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
726 config: canonicalize_aggregate(config)?,
727 }),
728 other => {
729 let step_name = canonical_step_name(&other);
730 let detail = camel_api::canonical_contract_rejection_reason(step_name)
731 .unwrap_or("not included in canonical v1");
732 Err(CamelError::RouteError(format!(
733 "canonical v1 does not support step `{step_name}`: {detail}"
734 )))
735 }
736 }
737}
738
739fn canonicalize_split_aggregation(
740 strategy: camel_api::splitter::AggregationStrategy,
741) -> Result<CanonicalSplitAggregationSpec, CamelError> {
742 match strategy {
743 camel_api::splitter::AggregationStrategy::LastWins => {
744 Ok(CanonicalSplitAggregationSpec::LastWins)
745 }
746 camel_api::splitter::AggregationStrategy::CollectAll => {
747 Ok(CanonicalSplitAggregationSpec::CollectAll)
748 }
749 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
750 "canonical v1 does not support custom split aggregation".to_string(),
751 )),
752 camel_api::splitter::AggregationStrategy::Original => {
753 Ok(CanonicalSplitAggregationSpec::Original)
754 }
755 }
756}
757
758fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
759 let completion_size = match config.completion {
760 CompletionCondition::Size(size) => Some(size),
761 CompletionCondition::Predicate(_) => {
762 return Err(CamelError::RouteError(
763 "canonical v1 does not support aggregate predicate completion".to_string(),
764 ));
765 }
766 };
767 let strategy = match config.strategy {
768 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
769 AggregationStrategy::Custom(_) => {
770 return Err(CamelError::RouteError(
771 "canonical v1 does not support custom aggregate strategy".to_string(),
772 ));
773 }
774 };
775 let bucket_ttl_ms = config
776 .bucket_ttl
777 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
778
779 Ok(CanonicalAggregateSpec {
780 header: config.header_name,
781 completion_size,
782 strategy,
783 max_buckets: config.max_buckets,
784 bucket_ttl_ms,
785 })
786}
787
788fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
789 CanonicalCircuitBreakerSpec {
790 failure_threshold: config.failure_threshold,
791 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
792 }
793}
794
795fn canonical_step_name(step: &BuilderStep) -> &'static str {
796 match step {
797 BuilderStep::Processor(_) => "processor",
798 BuilderStep::To(_) => "to",
799 BuilderStep::Stop => "stop",
800 BuilderStep::Log { .. } => "log",
801 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
802 BuilderStep::DeclarativeSetBody { .. } => "set_body",
803 BuilderStep::DeclarativeFilter { .. } => "filter",
804 BuilderStep::DeclarativeChoice { .. } => "choice",
805 BuilderStep::DeclarativeScript { .. } => "script",
806 BuilderStep::DeclarativeSplit { .. } => "split",
807 BuilderStep::Split { .. } => "split",
808 BuilderStep::Aggregate { .. } => "aggregate",
809 BuilderStep::Filter { .. } => "filter",
810 BuilderStep::Choice { .. } => "choice",
811 BuilderStep::WireTap { .. } => "wire_tap",
812 BuilderStep::Multicast { .. } => "multicast",
813 BuilderStep::DeclarativeLog { .. } => "log",
814 BuilderStep::Bean { .. } => "bean",
815 BuilderStep::Script { .. } => "script",
816 BuilderStep::Throttle { .. } => "throttle",
817 BuilderStep::LoadBalance { .. } => "load_balancer",
818 BuilderStep::DynamicRouter { .. } => "dynamic_router",
819 BuilderStep::RoutingSlip { .. } => "routing_slip",
820 }
821}
822
823impl StepAccumulator for RouteBuilder {
824 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
825 &mut self.steps
826 }
827}
828
829pub struct SplitBuilder {
837 parent: RouteBuilder,
838 config: SplitterConfig,
839 steps: Vec<BuilderStep>,
840}
841
842impl SplitBuilder {
843 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
845 where
846 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
847 {
848 FilterInSplitBuilder {
849 parent: self,
850 predicate: std::sync::Arc::new(predicate),
851 steps: vec![],
852 }
853 }
854
855 pub fn end_split(mut self) -> RouteBuilder {
858 let split_step = BuilderStep::Split {
859 config: self.config,
860 steps: self.steps,
861 };
862 self.parent.steps.push(split_step);
863 self.parent
864 }
865}
866
867impl StepAccumulator for SplitBuilder {
868 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
869 &mut self.steps
870 }
871}
872
873pub struct FilterBuilder {
875 parent: RouteBuilder,
876 predicate: FilterPredicate,
877 steps: Vec<BuilderStep>,
878}
879
880impl FilterBuilder {
881 pub fn end_filter(mut self) -> RouteBuilder {
884 let step = BuilderStep::Filter {
885 predicate: self.predicate,
886 steps: self.steps,
887 };
888 self.parent.steps.push(step);
889 self.parent
890 }
891}
892
893impl StepAccumulator for FilterBuilder {
894 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
895 &mut self.steps
896 }
897}
898
899pub struct FilterInSplitBuilder {
901 parent: SplitBuilder,
902 predicate: FilterPredicate,
903 steps: Vec<BuilderStep>,
904}
905
906impl FilterInSplitBuilder {
907 pub fn end_filter(mut self) -> SplitBuilder {
909 let step = BuilderStep::Filter {
910 predicate: self.predicate,
911 steps: self.steps,
912 };
913 self.parent.steps.push(step);
914 self.parent
915 }
916}
917
918impl StepAccumulator for FilterInSplitBuilder {
919 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
920 &mut self.steps
921 }
922}
923
924pub struct ChoiceBuilder {
931 parent: RouteBuilder,
932 whens: Vec<WhenStep>,
933 _otherwise: Option<Vec<BuilderStep>>,
934}
935
936impl ChoiceBuilder {
937 pub fn when<F>(self, predicate: F) -> WhenBuilder
940 where
941 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
942 {
943 WhenBuilder {
944 parent: self,
945 predicate: std::sync::Arc::new(predicate),
946 steps: vec![],
947 }
948 }
949
950 pub fn otherwise(self) -> OtherwiseBuilder {
954 OtherwiseBuilder {
955 parent: self,
956 steps: vec![],
957 }
958 }
959
960 pub fn end_choice(mut self) -> RouteBuilder {
964 let step = BuilderStep::Choice {
965 whens: self.whens,
966 otherwise: self._otherwise,
967 };
968 self.parent.steps.push(step);
969 self.parent
970 }
971}
972
973pub struct WhenBuilder {
975 parent: ChoiceBuilder,
976 predicate: camel_api::FilterPredicate,
977 steps: Vec<BuilderStep>,
978}
979
980impl WhenBuilder {
981 pub fn end_when(mut self) -> ChoiceBuilder {
984 self.parent.whens.push(WhenStep {
985 predicate: self.predicate,
986 steps: self.steps,
987 });
988 self.parent
989 }
990}
991
992impl StepAccumulator for WhenBuilder {
993 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
994 &mut self.steps
995 }
996}
997
998pub struct OtherwiseBuilder {
1000 parent: ChoiceBuilder,
1001 steps: Vec<BuilderStep>,
1002}
1003
1004impl OtherwiseBuilder {
1005 pub fn end_otherwise(self) -> ChoiceBuilder {
1007 let OtherwiseBuilder { mut parent, steps } = self;
1008 parent._otherwise = Some(steps);
1009 parent
1010 }
1011}
1012
1013impl StepAccumulator for OtherwiseBuilder {
1014 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1015 &mut self.steps
1016 }
1017}
1018
1019pub struct MulticastBuilder {
1027 parent: RouteBuilder,
1028 steps: Vec<BuilderStep>,
1029 config: MulticastConfig,
1030}
1031
1032impl MulticastBuilder {
1033 pub fn parallel(mut self, parallel: bool) -> Self {
1034 self.config = self.config.parallel(parallel);
1035 self
1036 }
1037
1038 pub fn parallel_limit(mut self, limit: usize) -> Self {
1039 self.config = self.config.parallel_limit(limit);
1040 self
1041 }
1042
1043 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1044 self.config = self.config.stop_on_exception(stop);
1045 self
1046 }
1047
1048 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1049 self.config = self.config.timeout(duration);
1050 self
1051 }
1052
1053 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1054 self.config = self.config.aggregation(strategy);
1055 self
1056 }
1057
1058 pub fn end_multicast(mut self) -> RouteBuilder {
1059 let step = BuilderStep::Multicast {
1060 steps: self.steps,
1061 config: self.config,
1062 };
1063 self.parent.steps.push(step);
1064 self.parent
1065 }
1066}
1067
1068impl StepAccumulator for MulticastBuilder {
1069 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1070 &mut self.steps
1071 }
1072}
1073
1074pub struct ThrottleBuilder {
1082 parent: RouteBuilder,
1083 config: ThrottlerConfig,
1084 steps: Vec<BuilderStep>,
1085}
1086
1087impl ThrottleBuilder {
1088 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1094 self.config = self.config.strategy(strategy);
1095 self
1096 }
1097
1098 pub fn end_throttle(mut self) -> RouteBuilder {
1101 let step = BuilderStep::Throttle {
1102 config: self.config,
1103 steps: self.steps,
1104 };
1105 self.parent.steps.push(step);
1106 self.parent
1107 }
1108}
1109
1110impl StepAccumulator for ThrottleBuilder {
1111 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1112 &mut self.steps
1113 }
1114}
1115
1116pub struct LoadBalancerBuilder {
1124 parent: RouteBuilder,
1125 config: LoadBalancerConfig,
1126 steps: Vec<BuilderStep>,
1127}
1128
1129impl LoadBalancerBuilder {
1130 pub fn round_robin(mut self) -> Self {
1132 self.config = LoadBalancerConfig::round_robin();
1133 self
1134 }
1135
1136 pub fn random(mut self) -> Self {
1138 self.config = LoadBalancerConfig::random();
1139 self
1140 }
1141
1142 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1147 self.config = LoadBalancerConfig::weighted(weights);
1148 self
1149 }
1150
1151 pub fn failover(mut self) -> Self {
1156 self.config = LoadBalancerConfig::failover();
1157 self
1158 }
1159
1160 pub fn parallel(mut self, parallel: bool) -> Self {
1165 self.config = self.config.parallel(parallel);
1166 self
1167 }
1168
1169 pub fn end_load_balance(mut self) -> RouteBuilder {
1172 let step = BuilderStep::LoadBalance {
1173 config: self.config,
1174 steps: self.steps,
1175 };
1176 self.parent.steps.push(step);
1177 self.parent
1178 }
1179}
1180
1181impl StepAccumulator for LoadBalancerBuilder {
1182 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1183 &mut self.steps
1184 }
1185}
1186
1187#[cfg(test)]
1192mod tests {
1193 use super::*;
1194 use camel_api::error_handler::ErrorHandlerConfig;
1195 use camel_api::load_balancer::LoadBalanceStrategy;
1196 use camel_api::{Exchange, Message};
1197 use camel_core::route::BuilderStep;
1198 use std::sync::Arc;
1199 use std::time::Duration;
1200 use tower::{Service, ServiceExt};
1201
1202 #[test]
1203 fn test_builder_from_creates_definition() {
1204 let definition = RouteBuilder::from("timer:tick")
1205 .route_id("test-route")
1206 .build()
1207 .unwrap();
1208 assert_eq!(definition.from_uri(), "timer:tick");
1209 }
1210
1211 #[test]
1212 fn test_builder_empty_from_uri_errors() {
1213 let result = RouteBuilder::from("").route_id("test-route").build();
1214 assert!(result.is_err());
1215 }
1216
1217 #[test]
1218 fn test_builder_to_adds_step() {
1219 let definition = RouteBuilder::from("timer:tick")
1220 .route_id("test-route")
1221 .to("log:info")
1222 .build()
1223 .unwrap();
1224
1225 assert_eq!(definition.from_uri(), "timer:tick");
1226 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1228 }
1229
1230 #[test]
1231 fn test_builder_filter_adds_filter_step() {
1232 let definition = RouteBuilder::from("timer:tick")
1233 .route_id("test-route")
1234 .filter(|_ex| true)
1235 .to("mock:result")
1236 .end_filter()
1237 .build()
1238 .unwrap();
1239
1240 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1241 }
1242
1243 #[test]
1244 fn test_builder_set_header_adds_processor_step() {
1245 let definition = RouteBuilder::from("timer:tick")
1246 .route_id("test-route")
1247 .set_header("key", Value::String("value".into()))
1248 .build()
1249 .unwrap();
1250
1251 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1252 }
1253
1254 #[test]
1255 fn test_builder_map_body_adds_processor_step() {
1256 let definition = RouteBuilder::from("timer:tick")
1257 .route_id("test-route")
1258 .map_body(|body| body)
1259 .build()
1260 .unwrap();
1261
1262 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1263 }
1264
1265 #[test]
1266 fn test_builder_process_adds_processor_step() {
1267 let definition = RouteBuilder::from("timer:tick")
1268 .route_id("test-route")
1269 .process(|ex| async move { Ok(ex) })
1270 .build()
1271 .unwrap();
1272
1273 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1274 }
1275
1276 #[test]
1277 fn test_builder_chain_multiple_steps() {
1278 let definition = RouteBuilder::from("timer:tick")
1279 .route_id("test-route")
1280 .set_header("source", Value::String("timer".into()))
1281 .filter(|ex| ex.input.header("source").is_some())
1282 .to("log:info")
1283 .end_filter()
1284 .to("mock:result")
1285 .build()
1286 .unwrap();
1287
1288 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1292 }
1293
1294 #[tokio::test]
1299 async fn test_set_header_processor_works() {
1300 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1301 let exchange = Exchange::new(Message::new("test"));
1302 let result = svc.call(exchange).await.unwrap();
1303 assert_eq!(
1304 result.input.header("greeting"),
1305 Some(&Value::String("hello".into()))
1306 );
1307 }
1308
1309 #[tokio::test]
1310 async fn test_filter_processor_passes() {
1311 use camel_api::BoxProcessorExt;
1312 use camel_processor::FilterService;
1313
1314 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1315 let mut svc =
1316 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1317 let exchange = Exchange::new(Message::new("pass"));
1318 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1319 assert_eq!(result.input.body.as_text(), Some("pass"));
1320 }
1321
1322 #[tokio::test]
1323 async fn test_filter_processor_blocks() {
1324 use camel_api::BoxProcessorExt;
1325 use camel_processor::FilterService;
1326
1327 let sub = BoxProcessor::from_fn(|_ex| {
1328 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1329 });
1330 let mut svc =
1331 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1332 let exchange = Exchange::new(Message::new("reject"));
1333 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1334 assert_eq!(result.input.body.as_text(), Some("reject"));
1335 }
1336
1337 #[tokio::test]
1338 async fn test_map_body_processor_works() {
1339 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1340 if let Some(text) = body.as_text() {
1341 Body::Text(text.to_uppercase())
1342 } else {
1343 body
1344 }
1345 });
1346 let exchange = Exchange::new(Message::new("hello"));
1347 let result = mapper.oneshot(exchange).await.unwrap();
1348 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1349 }
1350
1351 #[tokio::test]
1352 async fn test_process_custom_processor_works() {
1353 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1354 ex.set_property("custom", Value::Bool(true));
1355 Ok(ex)
1356 });
1357 let exchange = Exchange::new(Message::default());
1358 let result = processor.oneshot(exchange).await.unwrap();
1359 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1360 }
1361
1362 #[tokio::test]
1367 async fn test_compose_pipeline_runs_steps_in_order() {
1368 use camel_core::route::compose_pipeline;
1369
1370 let processors = vec![
1371 BoxProcessor::new(SetHeader::new(
1372 IdentityProcessor,
1373 "step",
1374 Value::String("one".into()),
1375 )),
1376 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1377 if let Some(text) = body.as_text() {
1378 Body::Text(format!("{}-processed", text))
1379 } else {
1380 body
1381 }
1382 })),
1383 ];
1384
1385 let pipeline = compose_pipeline(processors);
1386 let exchange = Exchange::new(Message::new("hello"));
1387 let result = pipeline.oneshot(exchange).await.unwrap();
1388
1389 assert_eq!(
1390 result.input.header("step"),
1391 Some(&Value::String("one".into()))
1392 );
1393 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1394 }
1395
1396 #[tokio::test]
1397 async fn test_compose_pipeline_empty_is_identity() {
1398 use camel_core::route::compose_pipeline;
1399
1400 let pipeline = compose_pipeline(vec![]);
1401 let exchange = Exchange::new(Message::new("unchanged"));
1402 let result = pipeline.oneshot(exchange).await.unwrap();
1403 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1404 }
1405
1406 #[test]
1411 fn test_builder_circuit_breaker_sets_config() {
1412 use camel_api::circuit_breaker::CircuitBreakerConfig;
1413
1414 let config = CircuitBreakerConfig::new().failure_threshold(5);
1415 let definition = RouteBuilder::from("timer:tick")
1416 .route_id("test-route")
1417 .circuit_breaker(config)
1418 .build()
1419 .unwrap();
1420
1421 let cb = definition
1422 .circuit_breaker_config()
1423 .expect("circuit breaker should be set");
1424 assert_eq!(cb.failure_threshold, 5);
1425 }
1426
1427 #[test]
1428 fn test_builder_circuit_breaker_with_error_handler() {
1429 use camel_api::circuit_breaker::CircuitBreakerConfig;
1430 use camel_api::error_handler::ErrorHandlerConfig;
1431
1432 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1433 let eh_config = ErrorHandlerConfig::log_only();
1434
1435 let definition = RouteBuilder::from("timer:tick")
1436 .route_id("test-route")
1437 .to("log:info")
1438 .circuit_breaker(cb_config)
1439 .error_handler(eh_config)
1440 .build()
1441 .unwrap();
1442
1443 assert!(
1444 definition.circuit_breaker_config().is_some(),
1445 "circuit breaker config should be set"
1446 );
1447 }
1449
1450 #[test]
1451 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1452 let definition = RouteBuilder::from("direct:start")
1453 .route_id("test-route")
1454 .dead_letter_channel("log:dlc")
1455 .on_exception(|e| matches!(e, CamelError::Io(_)))
1456 .retry(3)
1457 .handled_by("log:io")
1458 .end_on_exception()
1459 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1460 .retry(1)
1461 .end_on_exception()
1462 .to("mock:out")
1463 .build()
1464 .expect("route should build");
1465
1466 let cfg = definition
1467 .error_handler_config()
1468 .expect("error handler should be set");
1469 assert_eq!(cfg.policies.len(), 2);
1470 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1471 assert_eq!(
1472 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1473 Some(3)
1474 );
1475 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1476 assert_eq!(
1477 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1478 Some(1)
1479 );
1480 }
1481
1482 #[test]
1483 fn test_builder_on_exception_mixed_mode_rejected() {
1484 let result = RouteBuilder::from("direct:start")
1485 .route_id("test-route")
1486 .error_handler(ErrorHandlerConfig::log_only())
1487 .on_exception(|_e| true)
1488 .end_on_exception()
1489 .to("mock:out")
1490 .build();
1491
1492 let err = result.err().expect("mixed mode should fail with an error");
1493
1494 assert!(
1495 format!("{err}").contains("mixed error handler modes"),
1496 "unexpected error: {err}"
1497 );
1498 }
1499
1500 #[test]
1501 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1502 let definition = RouteBuilder::from("direct:start")
1503 .route_id("test-route")
1504 .on_exception(|_e| true)
1505 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1506 .with_jitter(0.5)
1507 .end_on_exception()
1508 .to("mock:out")
1509 .build()
1510 .expect("route should build");
1511
1512 let cfg = definition
1513 .error_handler_config()
1514 .expect("error handler should be set");
1515 assert_eq!(cfg.policies.len(), 1);
1516 assert!(cfg.policies[0].retry.is_none());
1517 }
1518
1519 #[test]
1520 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1521 let definition = RouteBuilder::from("direct:start")
1522 .route_id("test-route")
1523 .dead_letter_channel("log:dlc")
1524 .to("mock:out")
1525 .build()
1526 .expect("route should build");
1527
1528 let cfg = definition
1529 .error_handler_config()
1530 .expect("error handler should be set");
1531 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1532 assert!(cfg.policies.is_empty());
1533 }
1534
1535 #[test]
1536 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1537 let definition = RouteBuilder::from("direct:start")
1538 .route_id("test-route")
1539 .dead_letter_channel("log:first")
1540 .on_exception(|e| matches!(e, CamelError::Io(_)))
1541 .retry(2)
1542 .end_on_exception()
1543 .dead_letter_channel("log:second")
1544 .to("mock:out")
1545 .build()
1546 .expect("route should build");
1547
1548 let cfg = definition
1549 .error_handler_config()
1550 .expect("error handler should be set");
1551 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1552 assert_eq!(cfg.policies.len(), 1);
1553 assert_eq!(
1554 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1555 Some(2)
1556 );
1557 }
1558
1559 #[test]
1560 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1561 let definition = RouteBuilder::from("direct:start")
1562 .route_id("test-route")
1563 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1564 .retry(1)
1565 .end_on_exception()
1566 .to("mock:out")
1567 .build()
1568 .expect("route should build");
1569
1570 let cfg = definition
1571 .error_handler_config()
1572 .expect("error handler should be set");
1573 assert!(cfg.dlc_uri.is_none());
1574 assert_eq!(cfg.policies.len(), 1);
1575 }
1576
1577 #[test]
1578 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1579 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1580 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1581
1582 let definition = RouteBuilder::from("direct:start")
1583 .route_id("test-route")
1584 .error_handler(first)
1585 .error_handler(second)
1586 .to("mock:out")
1587 .build()
1588 .expect("route should build");
1589
1590 let cfg = definition
1591 .error_handler_config()
1592 .expect("error handler should be set");
1593 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1594 }
1595
1596 #[test]
1599 fn test_split_builder_typestate() {
1600 use camel_api::splitter::{SplitterConfig, split_body_lines};
1601
1602 let definition = RouteBuilder::from("timer:test?period=1000")
1604 .route_id("test-route")
1605 .split(SplitterConfig::new(split_body_lines()))
1606 .to("mock:per-fragment")
1607 .end_split()
1608 .to("mock:final")
1609 .build()
1610 .unwrap();
1611
1612 assert_eq!(definition.steps().len(), 2);
1614 }
1615
1616 #[test]
1617 fn test_split_builder_steps_collected() {
1618 use camel_api::splitter::{SplitterConfig, split_body_lines};
1619
1620 let definition = RouteBuilder::from("timer:test?period=1000")
1621 .route_id("test-route")
1622 .split(SplitterConfig::new(split_body_lines()))
1623 .set_header("fragment", Value::String("yes".into()))
1624 .to("mock:per-fragment")
1625 .end_split()
1626 .build()
1627 .unwrap();
1628
1629 assert_eq!(definition.steps().len(), 1);
1631 match &definition.steps()[0] {
1632 BuilderStep::Split { steps, .. } => {
1633 assert_eq!(steps.len(), 2); }
1635 other => panic!("Expected Split, got {:?}", other),
1636 }
1637 }
1638
1639 #[test]
1640 fn test_split_builder_config_propagated() {
1641 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1642
1643 let definition = RouteBuilder::from("timer:test?period=1000")
1644 .route_id("test-route")
1645 .split(
1646 SplitterConfig::new(split_body_lines())
1647 .parallel(true)
1648 .parallel_limit(4)
1649 .aggregation(AggregationStrategy::CollectAll),
1650 )
1651 .to("mock:per-fragment")
1652 .end_split()
1653 .build()
1654 .unwrap();
1655
1656 match &definition.steps()[0] {
1657 BuilderStep::Split { config, .. } => {
1658 assert!(config.parallel);
1659 assert_eq!(config.parallel_limit, Some(4));
1660 assert!(matches!(
1661 config.aggregation,
1662 AggregationStrategy::CollectAll
1663 ));
1664 }
1665 other => panic!("Expected Split, got {:?}", other),
1666 }
1667 }
1668
1669 #[test]
1670 fn test_aggregate_builder_adds_step() {
1671 use camel_api::aggregator::AggregatorConfig;
1672 use camel_core::route::BuilderStep;
1673
1674 let definition = RouteBuilder::from("timer:tick")
1675 .route_id("test-route")
1676 .aggregate(
1677 AggregatorConfig::correlate_by("key")
1678 .complete_when_size(2)
1679 .build(),
1680 )
1681 .build()
1682 .unwrap();
1683
1684 assert_eq!(definition.steps().len(), 1);
1685 assert!(matches!(
1686 definition.steps()[0],
1687 BuilderStep::Aggregate { .. }
1688 ));
1689 }
1690
1691 #[test]
1692 fn test_aggregate_in_split_builder() {
1693 use camel_api::aggregator::AggregatorConfig;
1694 use camel_api::splitter::{SplitterConfig, split_body_lines};
1695 use camel_core::route::BuilderStep;
1696
1697 let definition = RouteBuilder::from("timer:tick")
1698 .route_id("test-route")
1699 .split(SplitterConfig::new(split_body_lines()))
1700 .aggregate(
1701 AggregatorConfig::correlate_by("key")
1702 .complete_when_size(1)
1703 .build(),
1704 )
1705 .end_split()
1706 .build()
1707 .unwrap();
1708
1709 assert_eq!(definition.steps().len(), 1);
1710 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
1711 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
1712 } else {
1713 panic!("expected Split step");
1714 }
1715 }
1716
1717 #[test]
1720 fn test_builder_set_body_static_adds_processor() {
1721 let definition = RouteBuilder::from("timer:tick")
1722 .route_id("test-route")
1723 .set_body("fixed")
1724 .build()
1725 .unwrap();
1726 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1727 }
1728
1729 #[test]
1730 fn test_builder_set_body_fn_adds_processor() {
1731 let definition = RouteBuilder::from("timer:tick")
1732 .route_id("test-route")
1733 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
1734 .build()
1735 .unwrap();
1736 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1737 }
1738
1739 #[test]
1740 fn transform_alias_produces_same_as_set_body() {
1741 let route_transform = RouteBuilder::from("timer:tick")
1742 .route_id("test-route")
1743 .transform("hello")
1744 .build()
1745 .unwrap();
1746
1747 let route_set_body = RouteBuilder::from("timer:tick")
1748 .route_id("test-route")
1749 .set_body("hello")
1750 .build()
1751 .unwrap();
1752
1753 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
1754 }
1755
1756 #[test]
1757 fn test_builder_set_header_fn_adds_processor() {
1758 let definition = RouteBuilder::from("timer:tick")
1759 .route_id("test-route")
1760 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1761 .build()
1762 .unwrap();
1763 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1764 }
1765
1766 #[tokio::test]
1767 async fn test_set_body_static_processor_works() {
1768 use camel_core::route::compose_pipeline;
1769 let def = RouteBuilder::from("t:t")
1770 .route_id("test-route")
1771 .set_body("replaced")
1772 .build()
1773 .unwrap();
1774 let pipeline = compose_pipeline(
1775 def.steps()
1776 .iter()
1777 .filter_map(|s| {
1778 if let BuilderStep::Processor(p) = s {
1779 Some(p.clone())
1780 } else {
1781 None
1782 }
1783 })
1784 .collect(),
1785 );
1786 let exchange = Exchange::new(Message::new("original"));
1787 let result = pipeline.oneshot(exchange).await.unwrap();
1788 assert_eq!(result.input.body.as_text(), Some("replaced"));
1789 }
1790
1791 #[tokio::test]
1792 async fn test_set_body_fn_processor_works() {
1793 use camel_core::route::compose_pipeline;
1794 let def = RouteBuilder::from("t:t")
1795 .route_id("test-route")
1796 .set_body_fn(|ex: &Exchange| {
1797 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1798 })
1799 .build()
1800 .unwrap();
1801 let pipeline = compose_pipeline(
1802 def.steps()
1803 .iter()
1804 .filter_map(|s| {
1805 if let BuilderStep::Processor(p) = s {
1806 Some(p.clone())
1807 } else {
1808 None
1809 }
1810 })
1811 .collect(),
1812 );
1813 let exchange = Exchange::new(Message::new("hello"));
1814 let result = pipeline.oneshot(exchange).await.unwrap();
1815 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1816 }
1817
1818 #[tokio::test]
1819 async fn test_set_header_fn_processor_works() {
1820 use camel_core::route::compose_pipeline;
1821 let def = RouteBuilder::from("t:t")
1822 .route_id("test-route")
1823 .set_header_fn("echo", |ex: &Exchange| {
1824 ex.input
1825 .body
1826 .as_text()
1827 .map(|t| Value::String(t.into()))
1828 .unwrap_or(Value::Null)
1829 })
1830 .build()
1831 .unwrap();
1832 let pipeline = compose_pipeline(
1833 def.steps()
1834 .iter()
1835 .filter_map(|s| {
1836 if let BuilderStep::Processor(p) = s {
1837 Some(p.clone())
1838 } else {
1839 None
1840 }
1841 })
1842 .collect(),
1843 );
1844 let exchange = Exchange::new(Message::new("ping"));
1845 let result = pipeline.oneshot(exchange).await.unwrap();
1846 assert_eq!(
1847 result.input.header("echo"),
1848 Some(&Value::String("ping".into()))
1849 );
1850 }
1851
1852 #[test]
1855 fn test_filter_builder_typestate() {
1856 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1857 .route_id("test-route")
1858 .filter(|_ex| true)
1859 .to("mock:inner")
1860 .end_filter()
1861 .to("mock:outer")
1862 .build();
1863 assert!(result.is_ok());
1864 }
1865
1866 #[test]
1867 fn test_filter_builder_steps_collected() {
1868 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1869 .route_id("test-route")
1870 .filter(|_ex| true)
1871 .to("mock:inner")
1872 .end_filter()
1873 .build()
1874 .unwrap();
1875
1876 assert_eq!(definition.steps().len(), 1);
1877 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1878 }
1879
1880 #[test]
1881 fn test_wire_tap_builder_adds_step() {
1882 let definition = RouteBuilder::from("timer:tick")
1883 .route_id("test-route")
1884 .wire_tap("mock:tap")
1885 .to("mock:result")
1886 .build()
1887 .unwrap();
1888
1889 assert_eq!(definition.steps().len(), 2);
1890 assert!(
1891 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1892 );
1893 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1894 }
1895
1896 #[test]
1899 fn test_multicast_builder_typestate() {
1900 let definition = RouteBuilder::from("timer:tick")
1901 .route_id("test-route")
1902 .multicast()
1903 .to("direct:a")
1904 .to("direct:b")
1905 .end_multicast()
1906 .to("mock:result")
1907 .build()
1908 .unwrap();
1909
1910 assert_eq!(definition.steps().len(), 2); }
1912
1913 #[test]
1914 fn test_multicast_builder_steps_collected() {
1915 let definition = RouteBuilder::from("timer:tick")
1916 .route_id("test-route")
1917 .multicast()
1918 .to("direct:a")
1919 .to("direct:b")
1920 .end_multicast()
1921 .build()
1922 .unwrap();
1923
1924 match &definition.steps()[0] {
1925 BuilderStep::Multicast { steps, .. } => {
1926 assert_eq!(steps.len(), 2);
1927 }
1928 other => panic!("Expected Multicast, got {:?}", other),
1929 }
1930 }
1931
1932 #[test]
1935 fn test_builder_concurrent_sets_concurrency() {
1936 use camel_component::ConcurrencyModel;
1937
1938 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1939 .route_id("test-route")
1940 .concurrent(16)
1941 .to("log:info")
1942 .build()
1943 .unwrap();
1944
1945 assert_eq!(
1946 definition.concurrency_override(),
1947 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
1948 );
1949 }
1950
1951 #[test]
1952 fn test_builder_concurrent_zero_means_unbounded() {
1953 use camel_component::ConcurrencyModel;
1954
1955 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1956 .route_id("test-route")
1957 .concurrent(0)
1958 .to("log:info")
1959 .build()
1960 .unwrap();
1961
1962 assert_eq!(
1963 definition.concurrency_override(),
1964 Some(&ConcurrencyModel::Concurrent { max: None })
1965 );
1966 }
1967
1968 #[test]
1969 fn test_builder_sequential_sets_concurrency() {
1970 use camel_component::ConcurrencyModel;
1971
1972 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1973 .route_id("test-route")
1974 .sequential()
1975 .to("log:info")
1976 .build()
1977 .unwrap();
1978
1979 assert_eq!(
1980 definition.concurrency_override(),
1981 Some(&ConcurrencyModel::Sequential)
1982 );
1983 }
1984
1985 #[test]
1986 fn test_builder_default_concurrency_is_none() {
1987 let definition = RouteBuilder::from("timer:tick")
1988 .route_id("test-route")
1989 .to("log:info")
1990 .build()
1991 .unwrap();
1992
1993 assert_eq!(definition.concurrency_override(), None);
1994 }
1995
1996 #[test]
1999 fn test_builder_route_id_sets_id() {
2000 let definition = RouteBuilder::from("timer:tick")
2001 .route_id("my-route")
2002 .build()
2003 .unwrap();
2004
2005 assert_eq!(definition.route_id(), "my-route");
2006 }
2007
2008 #[test]
2009 fn test_build_without_route_id_fails() {
2010 let result = RouteBuilder::from("timer:tick?period=1000")
2011 .to("log:info")
2012 .build();
2013 let err = match result {
2014 Err(e) => e.to_string(),
2015 Ok(_) => panic!("build() should fail without route_id"),
2016 };
2017 assert!(
2018 err.contains("route_id"),
2019 "error should mention route_id, got: {}",
2020 err
2021 );
2022 }
2023
2024 #[test]
2025 fn test_builder_auto_startup_false() {
2026 let definition = RouteBuilder::from("timer:tick")
2027 .route_id("test-route")
2028 .auto_startup(false)
2029 .build()
2030 .unwrap();
2031
2032 assert!(!definition.auto_startup());
2033 }
2034
2035 #[test]
2036 fn test_builder_startup_order_custom() {
2037 let definition = RouteBuilder::from("timer:tick")
2038 .route_id("test-route")
2039 .startup_order(50)
2040 .build()
2041 .unwrap();
2042
2043 assert_eq!(definition.startup_order(), 50);
2044 }
2045
2046 #[test]
2047 fn test_builder_defaults() {
2048 let definition = RouteBuilder::from("timer:tick")
2049 .route_id("test-route")
2050 .build()
2051 .unwrap();
2052
2053 assert_eq!(definition.route_id(), "test-route");
2054 assert!(definition.auto_startup());
2055 assert_eq!(definition.startup_order(), 1000);
2056 }
2057
2058 #[test]
2061 fn test_choice_builder_single_when() {
2062 let definition = RouteBuilder::from("timer:tick")
2063 .route_id("test-route")
2064 .choice()
2065 .when(|ex: &Exchange| ex.input.header("type").is_some())
2066 .to("mock:typed")
2067 .end_when()
2068 .end_choice()
2069 .build()
2070 .unwrap();
2071 assert_eq!(definition.steps().len(), 1);
2072 assert!(
2073 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2074 if whens.len() == 1 && otherwise.is_none())
2075 );
2076 }
2077
2078 #[test]
2079 fn test_choice_builder_when_otherwise() {
2080 let definition = RouteBuilder::from("timer:tick")
2081 .route_id("test-route")
2082 .choice()
2083 .when(|ex: &Exchange| ex.input.header("a").is_some())
2084 .to("mock:a")
2085 .end_when()
2086 .otherwise()
2087 .to("mock:fallback")
2088 .end_otherwise()
2089 .end_choice()
2090 .build()
2091 .unwrap();
2092 assert!(
2093 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2094 if whens.len() == 1 && otherwise.is_some())
2095 );
2096 }
2097
2098 #[test]
2099 fn test_choice_builder_multiple_whens() {
2100 let definition = RouteBuilder::from("timer:tick")
2101 .route_id("test-route")
2102 .choice()
2103 .when(|ex: &Exchange| ex.input.header("a").is_some())
2104 .to("mock:a")
2105 .end_when()
2106 .when(|ex: &Exchange| ex.input.header("b").is_some())
2107 .to("mock:b")
2108 .end_when()
2109 .end_choice()
2110 .build()
2111 .unwrap();
2112 assert!(
2113 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2114 if whens.len() == 2)
2115 );
2116 }
2117
2118 #[test]
2119 fn test_choice_step_after_choice() {
2120 let definition = RouteBuilder::from("timer:tick")
2122 .route_id("test-route")
2123 .choice()
2124 .when(|_ex: &Exchange| true)
2125 .to("mock:inner")
2126 .end_when()
2127 .end_choice()
2128 .to("mock:outer") .build()
2130 .unwrap();
2131 assert_eq!(definition.steps().len(), 2);
2132 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2133 }
2134
2135 #[test]
2138 fn test_throttle_builder_typestate() {
2139 let definition = RouteBuilder::from("timer:tick")
2140 .route_id("test-route")
2141 .throttle(10, std::time::Duration::from_secs(1))
2142 .to("mock:result")
2143 .end_throttle()
2144 .build()
2145 .unwrap();
2146
2147 assert_eq!(definition.steps().len(), 1);
2148 assert!(matches!(
2149 &definition.steps()[0],
2150 BuilderStep::Throttle { .. }
2151 ));
2152 }
2153
2154 #[test]
2155 fn test_throttle_builder_with_strategy() {
2156 let definition = RouteBuilder::from("timer:tick")
2157 .route_id("test-route")
2158 .throttle(10, std::time::Duration::from_secs(1))
2159 .strategy(ThrottleStrategy::Reject)
2160 .to("mock:result")
2161 .end_throttle()
2162 .build()
2163 .unwrap();
2164
2165 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2166 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2167 } else {
2168 panic!("Expected Throttle step");
2169 }
2170 }
2171
2172 #[test]
2173 fn test_throttle_builder_steps_collected() {
2174 let definition = RouteBuilder::from("timer:tick")
2175 .route_id("test-route")
2176 .throttle(5, std::time::Duration::from_secs(1))
2177 .set_header("throttled", Value::Bool(true))
2178 .to("mock:throttled")
2179 .end_throttle()
2180 .build()
2181 .unwrap();
2182
2183 match &definition.steps()[0] {
2184 BuilderStep::Throttle { steps, .. } => {
2185 assert_eq!(steps.len(), 2); }
2187 other => panic!("Expected Throttle, got {:?}", other),
2188 }
2189 }
2190
2191 #[test]
2192 fn test_throttle_step_after_throttle() {
2193 let definition = RouteBuilder::from("timer:tick")
2195 .route_id("test-route")
2196 .throttle(10, std::time::Duration::from_secs(1))
2197 .to("mock:inner")
2198 .end_throttle()
2199 .to("mock:outer")
2200 .build()
2201 .unwrap();
2202
2203 assert_eq!(definition.steps().len(), 2);
2204 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2205 }
2206
2207 #[test]
2210 fn test_load_balance_builder_typestate() {
2211 let definition = RouteBuilder::from("timer:tick")
2212 .route_id("test-route")
2213 .load_balance()
2214 .round_robin()
2215 .to("mock:a")
2216 .to("mock:b")
2217 .end_load_balance()
2218 .build()
2219 .unwrap();
2220
2221 assert_eq!(definition.steps().len(), 1);
2222 assert!(matches!(
2223 &definition.steps()[0],
2224 BuilderStep::LoadBalance { .. }
2225 ));
2226 }
2227
2228 #[test]
2229 fn test_load_balance_builder_with_strategy() {
2230 let definition = RouteBuilder::from("timer:tick")
2231 .route_id("test-route")
2232 .load_balance()
2233 .random()
2234 .to("mock:result")
2235 .end_load_balance()
2236 .build()
2237 .unwrap();
2238
2239 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2240 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2241 } else {
2242 panic!("Expected LoadBalance step");
2243 }
2244 }
2245
2246 #[test]
2247 fn test_load_balance_builder_steps_collected() {
2248 let definition = RouteBuilder::from("timer:tick")
2249 .route_id("test-route")
2250 .load_balance()
2251 .set_header("lb", Value::Bool(true))
2252 .to("mock:a")
2253 .end_load_balance()
2254 .build()
2255 .unwrap();
2256
2257 match &definition.steps()[0] {
2258 BuilderStep::LoadBalance { steps, .. } => {
2259 assert_eq!(steps.len(), 2); }
2261 other => panic!("Expected LoadBalance, got {:?}", other),
2262 }
2263 }
2264
2265 #[test]
2266 fn test_load_balance_step_after_load_balance() {
2267 let definition = RouteBuilder::from("timer:tick")
2269 .route_id("test-route")
2270 .load_balance()
2271 .to("mock:inner")
2272 .end_load_balance()
2273 .to("mock:outer")
2274 .build()
2275 .unwrap();
2276
2277 assert_eq!(definition.steps().len(), 2);
2278 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2279 }
2280
2281 #[test]
2284 fn test_dynamic_router_builder() {
2285 let definition = RouteBuilder::from("timer:tick")
2286 .route_id("test-route")
2287 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2288 .build()
2289 .unwrap();
2290
2291 assert_eq!(definition.steps().len(), 1);
2292 assert!(matches!(
2293 &definition.steps()[0],
2294 BuilderStep::DynamicRouter { .. }
2295 ));
2296 }
2297
2298 #[test]
2299 fn test_dynamic_router_builder_with_config() {
2300 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2301 .max_iterations(100)
2302 .cache_size(500);
2303
2304 let definition = RouteBuilder::from("timer:tick")
2305 .route_id("test-route")
2306 .dynamic_router_with_config(config)
2307 .build()
2308 .unwrap();
2309
2310 assert_eq!(definition.steps().len(), 1);
2311 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2312 assert_eq!(config.max_iterations, 100);
2313 assert_eq!(config.cache_size, 500);
2314 } else {
2315 panic!("Expected DynamicRouter step");
2316 }
2317 }
2318
2319 #[test]
2320 fn test_dynamic_router_step_after_router() {
2321 let definition = RouteBuilder::from("timer:tick")
2323 .route_id("test-route")
2324 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2325 .to("mock:outer")
2326 .build()
2327 .unwrap();
2328
2329 assert_eq!(definition.steps().len(), 2);
2330 assert!(matches!(
2331 &definition.steps()[0],
2332 BuilderStep::DynamicRouter { .. }
2333 ));
2334 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2335 }
2336
2337 #[test]
2338 fn routing_slip_builder_creates_step() {
2339 use camel_api::RoutingSlipExpression;
2340
2341 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2342
2343 let route = RouteBuilder::from("direct:start")
2344 .route_id("routing-slip-test")
2345 .routing_slip(expression)
2346 .build()
2347 .unwrap();
2348
2349 assert!(
2350 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2351 "Expected RoutingSlip step"
2352 );
2353 }
2354
2355 #[test]
2356 fn routing_slip_with_config_builder_creates_step() {
2357 use camel_api::RoutingSlipConfig;
2358
2359 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2360 .uri_delimiter("|")
2361 .cache_size(50)
2362 .ignore_invalid_endpoints(true);
2363
2364 let route = RouteBuilder::from("direct:start")
2365 .route_id("routing-slip-config-test")
2366 .routing_slip_with_config(config)
2367 .build()
2368 .unwrap();
2369
2370 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2371 assert_eq!(config.uri_delimiter, "|");
2372 assert_eq!(config.cache_size, 50);
2373 assert!(config.ignore_invalid_endpoints);
2374 } else {
2375 panic!("Expected RoutingSlip step");
2376 }
2377 }
2378
2379 #[test]
2380 fn test_builder_marshal_adds_processor_step() {
2381 let definition = RouteBuilder::from("timer:tick")
2382 .route_id("test-route")
2383 .marshal("json")
2384 .build()
2385 .unwrap();
2386 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2387 }
2388
2389 #[test]
2390 fn test_builder_unmarshal_adds_processor_step() {
2391 let definition = RouteBuilder::from("timer:tick")
2392 .route_id("test-route")
2393 .unmarshal("json")
2394 .build()
2395 .unwrap();
2396 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2397 }
2398
2399 #[test]
2400 #[should_panic(expected = "unknown data format: 'csv'")]
2401 fn test_builder_marshal_panics_on_unknown_format() {
2402 let _ = RouteBuilder::from("timer:tick")
2403 .route_id("test-route")
2404 .marshal("csv")
2405 .build();
2406 }
2407
2408 #[test]
2409 #[should_panic(expected = "unknown data format: 'csv'")]
2410 fn test_builder_unmarshal_panics_on_unknown_format() {
2411 let _ = RouteBuilder::from("timer:tick")
2412 .route_id("test-route")
2413 .unmarshal("csv")
2414 .build();
2415 }
2416}