1use camel_api::DelayConfig;
2use camel_api::aggregator::{
3 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
4};
5use camel_api::body::Body;
6use camel_api::body_converter::BodyType;
7use camel_api::circuit_breaker::CircuitBreakerConfig;
8use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
9use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
10use camel_api::load_balancer::LoadBalancerConfig;
11use camel_api::loop_eip::{LoopConfig, LoopMode};
12use camel_api::multicast::{MulticastConfig, MulticastStrategy};
13use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
14use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
15use camel_api::splitter::SplitterConfig;
16use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
17use camel_api::{
18 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
19 ProcessorFn, Value,
20 runtime::{
21 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
22 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
23 CanonicalWhenSpec,
24 },
25};
26use camel_component_api::ConcurrencyModel;
27use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
28use camel_processor::{
29 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
30 StreamCacheService, UnmarshalService, builtin_data_format,
31};
32
33pub trait StepAccumulator: Sized {
39 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
40
41 fn to(mut self, endpoint: impl Into<String>) -> Self {
42 self.steps_mut().push(BuilderStep::To(endpoint.into()));
43 self
44 }
45
46 fn process<F, Fut>(mut self, f: F) -> Self
47 where
48 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
49 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
50 {
51 let svc = ProcessorFn::new(f);
52 self.steps_mut()
53 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
54 self
55 }
56
57 fn process_fn(mut self, processor: BoxProcessor) -> Self {
58 self.steps_mut().push(BuilderStep::Processor(processor));
59 self
60 }
61
62 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
63 let svc = SetHeader::new(IdentityProcessor, key, value);
64 self.steps_mut()
65 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
66 self
67 }
68
69 fn map_body<F>(mut self, mapper: F) -> Self
70 where
71 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
72 {
73 let svc = MapBody::new(IdentityProcessor, mapper);
74 self.steps_mut()
75 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
76 self
77 }
78
79 fn set_body<B>(mut self, body: B) -> Self
80 where
81 B: Into<Body> + Clone + Send + Sync + 'static,
82 {
83 let body: Body = body.into();
84 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
85 self.steps_mut()
86 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
87 self
88 }
89
90 fn transform<B>(self, body: B) -> Self
95 where
96 B: Into<Body> + Clone + Send + Sync + 'static,
97 {
98 self.set_body(body)
99 }
100
101 fn set_body_fn<F>(mut self, expr: F) -> Self
102 where
103 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
104 {
105 let svc = SetBody::new(IdentityProcessor, expr);
106 self.steps_mut()
107 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
108 self
109 }
110
111 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
112 where
113 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
114 {
115 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
116 self.steps_mut()
117 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
118 self
119 }
120
121 fn aggregate(mut self, config: AggregatorConfig) -> Self {
122 self.steps_mut().push(BuilderStep::Aggregate { config });
123 self
124 }
125
126 fn stop(mut self) -> Self {
132 self.steps_mut().push(BuilderStep::Stop);
133 self
134 }
135
136 fn delay(mut self, duration: std::time::Duration) -> Self {
137 self.steps_mut().push(BuilderStep::Delay {
138 config: DelayConfig::from_duration(duration),
139 });
140 self
141 }
142
143 fn delay_with_header(
144 mut self,
145 duration: std::time::Duration,
146 header: impl Into<String>,
147 ) -> Self {
148 self.steps_mut().push(BuilderStep::Delay {
149 config: DelayConfig::from_duration_with_header(duration, header),
150 });
151 self
152 }
153
154 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
158 self.steps_mut().push(BuilderStep::Log {
159 level,
160 message: message.into(),
161 });
162 self
163 }
164
165 fn convert_body_to(mut self, target: BodyType) -> Self {
177 let svc = ConvertBodyTo::new(IdentityProcessor, target);
178 self.steps_mut()
179 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
180 self
181 }
182
183 fn stream_cache(mut self, threshold: usize) -> Self {
184 let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
185 let svc = StreamCacheService::new(IdentityProcessor, config);
186 self.steps_mut()
187 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
188 self
189 }
190
191 fn stream_cache_default(self) -> Self {
195 self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
196 }
197
198 fn marshal(mut self, format: impl Into<String>) -> Self {
208 let name = format.into();
209 let df =
210 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
211 let svc = MarshalService::new(IdentityProcessor, df);
212 self.steps_mut()
213 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
214 self
215 }
216
217 fn unmarshal(mut self, format: impl Into<String>) -> Self {
227 let name = format.into();
228 let df =
229 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
230 let svc = UnmarshalService::new(IdentityProcessor, df);
231 self.steps_mut()
232 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
233 self
234 }
235
236 fn validate(mut self, schema_path: impl Into<String>) -> Self {
245 let path = schema_path.into();
246 let uri = if path.starts_with("validator:") {
247 path
248 } else {
249 format!("validator:{path}")
250 };
251 self.steps_mut().push(BuilderStep::To(uri));
252 self
253 }
254
255 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
266 self.steps_mut().push(BuilderStep::Script {
267 language: language.into(),
268 script: script.into(),
269 });
270 self
271 }
272
273 fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
274 self.steps_mut().push(BuilderStep::Bean {
275 name: name.into(),
276 method: method.into(),
277 });
278 self
279 }
280}
281
282pub struct RouteBuilder {
294 from_uri: String,
295 steps: Vec<BuilderStep>,
296 error_handler: Option<ErrorHandlerConfig>,
297 error_handler_mode: ErrorHandlerMode,
298 circuit_breaker_config: Option<CircuitBreakerConfig>,
299 concurrency: Option<ConcurrencyModel>,
300 route_id: Option<String>,
301 auto_startup: Option<bool>,
302 startup_order: Option<i32>,
303}
304
305#[derive(Default)]
306enum ErrorHandlerMode {
307 #[default]
308 None,
309 ExplicitConfig,
310 Shorthand {
311 dlc_uri: Option<String>,
312 specs: Vec<OnExceptionSpec>,
313 },
314 Mixed,
315}
316
317#[derive(Clone)]
318struct OnExceptionSpec {
319 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
320 retry: Option<RedeliveryPolicy>,
321 handled_by: Option<String>,
322}
323
324impl RouteBuilder {
325 pub fn from(endpoint: &str) -> Self {
327 Self {
328 from_uri: endpoint.to_string(),
329 steps: Vec::new(),
330 error_handler: None,
331 error_handler_mode: ErrorHandlerMode::None,
332 circuit_breaker_config: None,
333 concurrency: None,
334 route_id: None,
335 auto_startup: None,
336 startup_order: None,
337 }
338 }
339
340 pub fn filter<F>(self, predicate: F) -> FilterBuilder
344 where
345 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
346 {
347 FilterBuilder {
348 parent: self,
349 predicate: std::sync::Arc::new(predicate),
350 steps: vec![],
351 }
352 }
353
354 pub fn choice(self) -> ChoiceBuilder {
360 ChoiceBuilder {
361 parent: self,
362 whens: vec![],
363 _otherwise: None,
364 }
365 }
366
367 pub fn wire_tap(mut self, endpoint: &str) -> Self {
371 self.steps.push(BuilderStep::WireTap {
372 uri: endpoint.to_string(),
373 });
374 self
375 }
376
377 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
379 self.error_handler_mode = match self.error_handler_mode {
380 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
381 ErrorHandlerMode::ExplicitConfig
382 }
383 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
384 };
385 self.error_handler = Some(config);
386 self
387 }
388
389 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
391 let uri = uri.into();
392 self.error_handler_mode = match self.error_handler_mode {
393 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
394 dlc_uri: Some(uri),
395 specs: Vec::new(),
396 },
397 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
398 dlc_uri: Some(uri),
399 specs,
400 },
401 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
402 };
403 self
404 }
405
406 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
408 where
409 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
410 {
411 self.error_handler_mode = match self.error_handler_mode {
412 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
413 dlc_uri: None,
414 specs: Vec::new(),
415 },
416 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
417 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
418 };
419
420 OnExceptionBuilder {
421 parent: self,
422 policy: OnExceptionSpec {
423 matches: std::sync::Arc::new(matches),
424 retry: None,
425 handled_by: None,
426 },
427 }
428 }
429
430 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
432 self.circuit_breaker_config = Some(config);
433 self
434 }
435
436 pub fn concurrent(mut self, max: usize) -> Self {
450 let max = if max == 0 { None } else { Some(max) };
451 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
452 self
453 }
454
455 pub fn sequential(mut self) -> Self {
460 self.concurrency = Some(ConcurrencyModel::Sequential);
461 self
462 }
463
464 pub fn route_id(mut self, id: impl Into<String>) -> Self {
468 self.route_id = Some(id.into());
469 self
470 }
471
472 pub fn auto_startup(mut self, auto: bool) -> Self {
476 self.auto_startup = Some(auto);
477 self
478 }
479
480 pub fn startup_order(mut self, order: i32) -> Self {
484 self.startup_order = Some(order);
485 self
486 }
487
488 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
494 SplitBuilder {
495 parent: self,
496 config,
497 steps: Vec::new(),
498 }
499 }
500
501 pub fn multicast(self) -> MulticastBuilder {
507 MulticastBuilder {
508 parent: self,
509 steps: Vec::new(),
510 config: MulticastConfig::new(),
511 }
512 }
513
514 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
521 ThrottleBuilder {
522 parent: self,
523 config: ThrottlerConfig::new(max_requests, period),
524 steps: Vec::new(),
525 }
526 }
527
528 pub fn loop_count(self, count: usize) -> LoopBuilder {
530 LoopBuilder {
531 parent: self,
532 config: LoopConfig {
533 mode: LoopMode::Count(count),
534 },
535 steps: vec![],
536 }
537 }
538
539 pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
541 where
542 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
543 {
544 LoopBuilder {
545 parent: self,
546 config: LoopConfig {
547 mode: LoopMode::While(std::sync::Arc::new(predicate)),
548 },
549 steps: vec![],
550 }
551 }
552
553 pub fn load_balance(self) -> LoadBalancerBuilder {
559 LoadBalancerBuilder {
560 parent: self,
561 config: LoadBalancerConfig::round_robin(),
562 steps: Vec::new(),
563 }
564 }
565
566 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
582 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
583 }
584
585 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
589 self.steps.push(BuilderStep::DynamicRouter { config });
590 self
591 }
592
593 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
594 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
595 }
596
597 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
598 self.steps.push(BuilderStep::RoutingSlip { config });
599 self
600 }
601
602 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
603 self.recipient_list_with_config(RecipientListConfig::new(expression))
604 }
605
606 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
607 self.steps.push(BuilderStep::RecipientList { config });
608 self
609 }
610
611 pub fn build(self) -> Result<RouteDefinition, CamelError> {
613 if self.from_uri.is_empty() {
614 return Err(CamelError::RouteError(
615 "route must have a 'from' URI".to_string(),
616 ));
617 }
618 let route_id = self.route_id.ok_or_else(|| {
619 CamelError::RouteError(
620 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
621 .to_string(),
622 )
623 })?;
624 let resolved_error_handler = match self.error_handler_mode {
625 ErrorHandlerMode::None => self.error_handler,
626 ErrorHandlerMode::ExplicitConfig => self.error_handler,
627 ErrorHandlerMode::Mixed => {
628 return Err(CamelError::RouteError(
629 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
630 ));
631 }
632 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
633 let mut config = if let Some(uri) = dlc_uri {
634 ErrorHandlerConfig::dead_letter_channel(uri)
635 } else {
636 ErrorHandlerConfig::log_only()
637 };
638
639 for spec in specs {
640 let matcher = spec.matches.clone();
641 let mut builder = config.on_exception(move |e| matcher(e));
642
643 if let Some(retry) = spec.retry {
644 builder = builder.retry(retry.max_attempts).with_backoff(
645 retry.initial_delay,
646 retry.multiplier,
647 retry.max_delay,
648 );
649 if retry.jitter_factor > 0.0 {
650 builder = builder.with_jitter(retry.jitter_factor);
651 }
652 }
653
654 if let Some(uri) = spec.handled_by {
655 builder = builder.handled_by(uri);
656 }
657
658 config = builder.build();
659 }
660
661 Some(config)
662 }
663 };
664
665 let definition = RouteDefinition::new(self.from_uri, self.steps);
666 let definition = if let Some(eh) = resolved_error_handler {
667 definition.with_error_handler(eh)
668 } else {
669 definition
670 };
671 let definition = if let Some(cb) = self.circuit_breaker_config {
672 definition.with_circuit_breaker(cb)
673 } else {
674 definition
675 };
676 let definition = if let Some(concurrency) = self.concurrency {
677 definition.with_concurrency(concurrency)
678 } else {
679 definition
680 };
681 let definition = definition.with_route_id(route_id);
682 let definition = if let Some(auto) = self.auto_startup {
683 definition.with_auto_startup(auto)
684 } else {
685 definition
686 };
687 let definition = if let Some(order) = self.startup_order {
688 definition.with_startup_order(order)
689 } else {
690 definition
691 };
692 Ok(definition)
693 }
694
695 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
697 if self.from_uri.is_empty() {
698 return Err(CamelError::RouteError(
699 "route must have a 'from' URI".to_string(),
700 ));
701 }
702 let route_id = self.route_id.ok_or_else(|| {
703 CamelError::RouteError(
704 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
705 .to_string(),
706 )
707 })?;
708
709 let steps = canonicalize_steps(self.steps)?;
710 let circuit_breaker = self
711 .circuit_breaker_config
712 .map(canonicalize_circuit_breaker);
713
714 let spec = CanonicalRouteSpec {
715 route_id,
716 from: self.from_uri,
717 steps,
718 circuit_breaker,
719 version: camel_api::CANONICAL_CONTRACT_VERSION,
720 };
721 spec.validate_contract()?;
722 Ok(spec)
723 }
724}
725
726pub struct OnExceptionBuilder {
727 parent: RouteBuilder,
728 policy: OnExceptionSpec,
729}
730
731impl OnExceptionBuilder {
732 pub fn retry(mut self, max_attempts: u32) -> Self {
733 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
734 self
735 }
736
737 pub fn with_backoff(
738 mut self,
739 initial: std::time::Duration,
740 multiplier: f64,
741 max: std::time::Duration,
742 ) -> Self {
743 if let Some(ref mut retry) = self.policy.retry {
744 retry.initial_delay = initial;
745 retry.multiplier = multiplier;
746 retry.max_delay = max;
747 }
748 self
749 }
750
751 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
752 if let Some(ref mut retry) = self.policy.retry {
753 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
754 }
755 self
756 }
757
758 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
759 self.policy.handled_by = Some(uri.into());
760 self
761 }
762
763 pub fn end_on_exception(mut self) -> RouteBuilder {
764 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
765 specs.push(self.policy);
766 }
767 self.parent
768 }
769}
770
771fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
772 let mut canonical = Vec::with_capacity(steps.len());
773 for step in steps {
774 canonical.push(canonicalize_step(step)?);
775 }
776 Ok(canonical)
777}
778
779fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
780 match step {
781 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
782 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
783 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
784 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
785 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
786 delay_ms: config.delay_ms,
787 dynamic_header: config.dynamic_header,
788 }),
789 BuilderStep::DeclarativeScript { expression } => {
790 Ok(CanonicalStepSpec::Script { expression })
791 }
792 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
793 predicate,
794 steps: canonicalize_steps(steps)?,
795 }),
796 BuilderStep::DeclarativeChoice { whens, otherwise } => {
797 let mut canonical_whens = Vec::with_capacity(whens.len());
798 for DeclarativeWhenStep { predicate, steps } in whens {
799 canonical_whens.push(CanonicalWhenSpec {
800 predicate,
801 steps: canonicalize_steps(steps)?,
802 });
803 }
804 let otherwise = match otherwise {
805 Some(steps) => Some(canonicalize_steps(steps)?),
806 None => None,
807 };
808 Ok(CanonicalStepSpec::Choice {
809 whens: canonical_whens,
810 otherwise,
811 })
812 }
813 BuilderStep::DeclarativeSplit {
814 expression,
815 aggregation,
816 parallel,
817 parallel_limit,
818 stop_on_exception,
819 steps,
820 } => Ok(CanonicalStepSpec::Split {
821 expression: CanonicalSplitExpressionSpec::Language(expression),
822 aggregation: canonicalize_split_aggregation(aggregation)?,
823 parallel,
824 parallel_limit,
825 stop_on_exception,
826 steps: canonicalize_steps(steps)?,
827 }),
828 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate(
829 canonicalize_aggregate(config)?,
830 )),
831 other => {
832 let step_name = canonical_step_name(&other);
833 let detail = camel_api::canonical_contract_rejection_reason(step_name)
834 .unwrap_or("not included in canonical v1");
835 Err(CamelError::RouteError(format!(
836 "canonical v1 does not support step `{step_name}`: {detail}"
837 )))
838 }
839 }
840}
841
842fn canonicalize_split_aggregation(
843 strategy: camel_api::splitter::AggregationStrategy,
844) -> Result<CanonicalSplitAggregationSpec, CamelError> {
845 match strategy {
846 camel_api::splitter::AggregationStrategy::LastWins => {
847 Ok(CanonicalSplitAggregationSpec::LastWins)
848 }
849 camel_api::splitter::AggregationStrategy::CollectAll => {
850 Ok(CanonicalSplitAggregationSpec::CollectAll)
851 }
852 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
853 "canonical v1 does not support custom split aggregation".to_string(),
854 )),
855 camel_api::splitter::AggregationStrategy::Original => {
856 Ok(CanonicalSplitAggregationSpec::Original)
857 }
858 }
859}
860
861fn extract_completion_fields(
862 mode: &CompletionMode,
863) -> Result<(Option<usize>, Option<u64>), CamelError> {
864 match mode {
865 CompletionMode::Single(cond) => match cond {
866 CompletionCondition::Size(n) => Ok((Some(*n), None)),
867 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
868 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
869 "canonical v1 does not support aggregate predicate completion".to_string(),
870 )),
871 },
872 CompletionMode::Any(conds) => {
873 let mut size = None;
874 let mut timeout_ms = None;
875 for cond in conds {
876 match cond {
877 CompletionCondition::Size(n) => size = Some(*n),
878 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
879 CompletionCondition::Predicate(_) => {
880 return Err(CamelError::RouteError(
881 "canonical v1 does not support aggregate predicate completion"
882 .to_string(),
883 ));
884 }
885 }
886 }
887 Ok((size, timeout_ms))
888 }
889 }
890}
891
892fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
893 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
894
895 let header = match &config.correlation {
896 CorrelationStrategy::HeaderName(h) => h.clone(),
897 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
898 CorrelationStrategy::Fn(_) => {
899 return Err(CamelError::RouteError(
900 "canonical v1 does not support Fn correlation strategy".to_string(),
901 ));
902 }
903 };
904
905 let correlation_key = match &config.correlation {
906 CorrelationStrategy::HeaderName(_) => None,
907 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
908 CorrelationStrategy::Fn(_) => unreachable!(),
909 };
910
911 let strategy = match config.strategy {
912 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
913 AggregationStrategy::Custom(_) => {
914 return Err(CamelError::RouteError(
915 "canonical v1 does not support custom aggregate strategy".to_string(),
916 ));
917 }
918 };
919 let bucket_ttl_ms = config
920 .bucket_ttl
921 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
922
923 Ok(CanonicalAggregateSpec {
924 header,
925 completion_size,
926 completion_timeout_ms,
927 correlation_key,
928 force_completion_on_stop: if config.force_completion_on_stop {
929 Some(true)
930 } else {
931 None
932 },
933 discard_on_timeout: if config.discard_on_timeout {
934 Some(true)
935 } else {
936 None
937 },
938 strategy,
939 max_buckets: config.max_buckets,
940 bucket_ttl_ms,
941 })
942}
943
944fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
945 CanonicalCircuitBreakerSpec {
946 failure_threshold: config.failure_threshold,
947 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
948 }
949}
950
951fn canonical_step_name(step: &BuilderStep) -> &'static str {
952 match step {
953 BuilderStep::Processor(_) => "processor",
954 BuilderStep::To(_) => "to",
955 BuilderStep::Stop => "stop",
956 BuilderStep::Log { .. } => "log",
957 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
958 BuilderStep::DeclarativeSetBody { .. } => "set_body",
959 BuilderStep::DeclarativeFilter { .. } => "filter",
960 BuilderStep::DeclarativeChoice { .. } => "choice",
961 BuilderStep::DeclarativeScript { .. } => "script",
962 BuilderStep::DeclarativeFunction { .. } => "function",
963 BuilderStep::DeclarativeSplit { .. } => "split",
964 BuilderStep::Split { .. } => "split",
965 BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
966 BuilderStep::Aggregate { .. } => "aggregate",
967 BuilderStep::Filter { .. } => "filter",
968 BuilderStep::Choice { .. } => "choice",
969 BuilderStep::WireTap { .. } => "wire_tap",
970 BuilderStep::Delay { .. } => "delay",
971 BuilderStep::Multicast { .. } => "multicast",
972 BuilderStep::DeclarativeLog { .. } => "log",
973 BuilderStep::Bean { .. } => "bean",
974 BuilderStep::Script { .. } => "script",
975 BuilderStep::Throttle { .. } => "throttle",
976 BuilderStep::LoadBalance { .. } => "load_balancer",
977 BuilderStep::DynamicRouter { .. } => "dynamic_router",
978 BuilderStep::RoutingSlip { .. } => "routing_slip",
979 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
980 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
981 BuilderStep::RecipientList { .. } => "recipient_list",
982 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
983 }
984}
985
986impl StepAccumulator for RouteBuilder {
987 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
988 &mut self.steps
989 }
990}
991
992pub struct SplitBuilder {
1000 parent: RouteBuilder,
1001 config: SplitterConfig,
1002 steps: Vec<BuilderStep>,
1003}
1004
1005impl SplitBuilder {
1006 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1008 where
1009 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1010 {
1011 FilterInSplitBuilder {
1012 parent: self,
1013 predicate: std::sync::Arc::new(predicate),
1014 steps: vec![],
1015 }
1016 }
1017
1018 pub fn end_split(mut self) -> RouteBuilder {
1021 let split_step = BuilderStep::Split {
1022 config: self.config,
1023 steps: self.steps,
1024 };
1025 self.parent.steps.push(split_step);
1026 self.parent
1027 }
1028}
1029
1030impl StepAccumulator for SplitBuilder {
1031 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1032 &mut self.steps
1033 }
1034}
1035
1036pub struct FilterBuilder {
1038 parent: RouteBuilder,
1039 predicate: FilterPredicate,
1040 steps: Vec<BuilderStep>,
1041}
1042
1043impl FilterBuilder {
1044 pub fn end_filter(mut self) -> RouteBuilder {
1047 let step = BuilderStep::Filter {
1048 predicate: self.predicate,
1049 steps: self.steps,
1050 };
1051 self.parent.steps.push(step);
1052 self.parent
1053 }
1054}
1055
1056impl StepAccumulator for FilterBuilder {
1057 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1058 &mut self.steps
1059 }
1060}
1061
1062pub struct FilterInSplitBuilder {
1064 parent: SplitBuilder,
1065 predicate: FilterPredicate,
1066 steps: Vec<BuilderStep>,
1067}
1068
1069impl FilterInSplitBuilder {
1070 pub fn end_filter(mut self) -> SplitBuilder {
1072 let step = BuilderStep::Filter {
1073 predicate: self.predicate,
1074 steps: self.steps,
1075 };
1076 self.parent.steps.push(step);
1077 self.parent
1078 }
1079}
1080
1081impl StepAccumulator for FilterInSplitBuilder {
1082 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1083 &mut self.steps
1084 }
1085}
1086
1087pub struct ChoiceBuilder {
1094 parent: RouteBuilder,
1095 whens: Vec<WhenStep>,
1096 _otherwise: Option<Vec<BuilderStep>>,
1097}
1098
1099impl ChoiceBuilder {
1100 pub fn when<F>(self, predicate: F) -> WhenBuilder
1103 where
1104 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1105 {
1106 WhenBuilder {
1107 parent: self,
1108 predicate: std::sync::Arc::new(predicate),
1109 steps: vec![],
1110 }
1111 }
1112
1113 pub fn otherwise(self) -> OtherwiseBuilder {
1117 OtherwiseBuilder {
1118 parent: self,
1119 steps: vec![],
1120 }
1121 }
1122
1123 pub fn end_choice(mut self) -> RouteBuilder {
1127 let step = BuilderStep::Choice {
1128 whens: self.whens,
1129 otherwise: self._otherwise,
1130 };
1131 self.parent.steps.push(step);
1132 self.parent
1133 }
1134}
1135
1136pub struct WhenBuilder {
1138 parent: ChoiceBuilder,
1139 predicate: camel_api::FilterPredicate,
1140 steps: Vec<BuilderStep>,
1141}
1142
1143impl WhenBuilder {
1144 pub fn end_when(mut self) -> ChoiceBuilder {
1147 self.parent.whens.push(WhenStep {
1148 predicate: self.predicate,
1149 steps: self.steps,
1150 });
1151 self.parent
1152 }
1153}
1154
1155impl StepAccumulator for WhenBuilder {
1156 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1157 &mut self.steps
1158 }
1159}
1160
1161pub struct OtherwiseBuilder {
1163 parent: ChoiceBuilder,
1164 steps: Vec<BuilderStep>,
1165}
1166
1167impl OtherwiseBuilder {
1168 pub fn end_otherwise(self) -> ChoiceBuilder {
1170 let OtherwiseBuilder { mut parent, steps } = self;
1171 parent._otherwise = Some(steps);
1172 parent
1173 }
1174}
1175
1176impl StepAccumulator for OtherwiseBuilder {
1177 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1178 &mut self.steps
1179 }
1180}
1181
1182pub struct MulticastBuilder {
1190 parent: RouteBuilder,
1191 steps: Vec<BuilderStep>,
1192 config: MulticastConfig,
1193}
1194
1195impl MulticastBuilder {
1196 pub fn parallel(mut self, parallel: bool) -> Self {
1197 self.config = self.config.parallel(parallel);
1198 self
1199 }
1200
1201 pub fn parallel_limit(mut self, limit: usize) -> Self {
1202 self.config = self.config.parallel_limit(limit);
1203 self
1204 }
1205
1206 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1207 self.config = self.config.stop_on_exception(stop);
1208 self
1209 }
1210
1211 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1212 self.config = self.config.timeout(duration);
1213 self
1214 }
1215
1216 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1217 self.config = self.config.aggregation(strategy);
1218 self
1219 }
1220
1221 pub fn end_multicast(mut self) -> RouteBuilder {
1222 let step = BuilderStep::Multicast {
1223 steps: self.steps,
1224 config: self.config,
1225 };
1226 self.parent.steps.push(step);
1227 self.parent
1228 }
1229}
1230
1231impl StepAccumulator for MulticastBuilder {
1232 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1233 &mut self.steps
1234 }
1235}
1236
1237pub struct ThrottleBuilder {
1245 parent: RouteBuilder,
1246 config: ThrottlerConfig,
1247 steps: Vec<BuilderStep>,
1248}
1249
1250impl ThrottleBuilder {
1251 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1257 self.config = self.config.strategy(strategy);
1258 self
1259 }
1260
1261 pub fn end_throttle(mut self) -> RouteBuilder {
1264 let step = BuilderStep::Throttle {
1265 config: self.config,
1266 steps: self.steps,
1267 };
1268 self.parent.steps.push(step);
1269 self.parent
1270 }
1271}
1272
1273impl StepAccumulator for ThrottleBuilder {
1274 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1275 &mut self.steps
1276 }
1277}
1278
1279pub struct LoopBuilder {
1281 parent: RouteBuilder,
1282 config: LoopConfig,
1283 steps: Vec<BuilderStep>,
1284}
1285
1286impl LoopBuilder {
1287 pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1288 LoopInLoopBuilder {
1289 parent: self,
1290 config: LoopConfig {
1291 mode: LoopMode::Count(count),
1292 },
1293 steps: vec![],
1294 }
1295 }
1296
1297 pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1298 where
1299 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1300 {
1301 LoopInLoopBuilder {
1302 parent: self,
1303 config: LoopConfig {
1304 mode: LoopMode::While(std::sync::Arc::new(predicate)),
1305 },
1306 steps: vec![],
1307 }
1308 }
1309
1310 pub fn end_loop(mut self) -> RouteBuilder {
1311 let step = BuilderStep::Loop {
1312 config: self.config,
1313 steps: self.steps,
1314 };
1315 self.parent.steps.push(step);
1316 self.parent
1317 }
1318}
1319
1320impl StepAccumulator for LoopBuilder {
1321 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1322 &mut self.steps
1323 }
1324}
1325
1326pub struct LoopInLoopBuilder {
1327 parent: LoopBuilder,
1328 config: LoopConfig,
1329 steps: Vec<BuilderStep>,
1330}
1331
1332impl LoopInLoopBuilder {
1333 pub fn end_loop(mut self) -> LoopBuilder {
1334 let step = BuilderStep::Loop {
1335 config: self.config,
1336 steps: self.steps,
1337 };
1338 self.parent.steps.push(step);
1339 self.parent
1340 }
1341}
1342
1343impl StepAccumulator for LoopInLoopBuilder {
1344 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1345 &mut self.steps
1346 }
1347}
1348
1349pub struct LoadBalancerBuilder {
1357 parent: RouteBuilder,
1358 config: LoadBalancerConfig,
1359 steps: Vec<BuilderStep>,
1360}
1361
1362impl LoadBalancerBuilder {
1363 pub fn round_robin(mut self) -> Self {
1365 self.config = LoadBalancerConfig::round_robin();
1366 self
1367 }
1368
1369 pub fn random(mut self) -> Self {
1371 self.config = LoadBalancerConfig::random();
1372 self
1373 }
1374
1375 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1380 self.config = LoadBalancerConfig::weighted(weights);
1381 self
1382 }
1383
1384 pub fn failover(mut self) -> Self {
1389 self.config = LoadBalancerConfig::failover();
1390 self
1391 }
1392
1393 pub fn parallel(mut self, parallel: bool) -> Self {
1398 self.config = self.config.parallel(parallel);
1399 self
1400 }
1401
1402 pub fn end_load_balance(mut self) -> RouteBuilder {
1405 let step = BuilderStep::LoadBalance {
1406 config: self.config,
1407 steps: self.steps,
1408 };
1409 self.parent.steps.push(step);
1410 self.parent
1411 }
1412}
1413
1414impl StepAccumulator for LoadBalancerBuilder {
1415 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1416 &mut self.steps
1417 }
1418}
1419
1420#[cfg(test)]
1425mod tests {
1426 use super::*;
1427 use camel_api::error_handler::ErrorHandlerConfig;
1428 use camel_api::load_balancer::LoadBalanceStrategy;
1429 use camel_api::{Exchange, Message};
1430 use camel_core::route::BuilderStep;
1431 use std::sync::Arc;
1432 use std::time::Duration;
1433 use tower::{Service, ServiceExt};
1434
1435 #[test]
1436 fn test_builder_from_creates_definition() {
1437 let definition = RouteBuilder::from("timer:tick")
1438 .route_id("test-route")
1439 .build()
1440 .unwrap();
1441 assert_eq!(definition.from_uri(), "timer:tick");
1442 }
1443
1444 #[test]
1445 fn test_builder_empty_from_uri_errors() {
1446 let result = RouteBuilder::from("").route_id("test-route").build();
1447 assert!(result.is_err());
1448 }
1449
1450 #[test]
1451 fn test_builder_to_adds_step() {
1452 let definition = RouteBuilder::from("timer:tick")
1453 .route_id("test-route")
1454 .to("log:info")
1455 .build()
1456 .unwrap();
1457
1458 assert_eq!(definition.from_uri(), "timer:tick");
1459 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1461 }
1462
1463 #[test]
1464 fn test_builder_filter_adds_filter_step() {
1465 let definition = RouteBuilder::from("timer:tick")
1466 .route_id("test-route")
1467 .filter(|_ex| true)
1468 .to("mock:result")
1469 .end_filter()
1470 .build()
1471 .unwrap();
1472
1473 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1474 }
1475
1476 #[test]
1477 fn test_builder_set_header_adds_processor_step() {
1478 let definition = RouteBuilder::from("timer:tick")
1479 .route_id("test-route")
1480 .set_header("key", Value::String("value".into()))
1481 .build()
1482 .unwrap();
1483
1484 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1485 }
1486
1487 #[test]
1488 fn test_builder_map_body_adds_processor_step() {
1489 let definition = RouteBuilder::from("timer:tick")
1490 .route_id("test-route")
1491 .map_body(|body| body)
1492 .build()
1493 .unwrap();
1494
1495 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1496 }
1497
1498 #[test]
1499 fn test_builder_process_adds_processor_step() {
1500 let definition = RouteBuilder::from("timer:tick")
1501 .route_id("test-route")
1502 .process(|ex| async move { Ok(ex) })
1503 .build()
1504 .unwrap();
1505
1506 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1507 }
1508
1509 #[test]
1510 fn test_builder_chain_multiple_steps() {
1511 let definition = RouteBuilder::from("timer:tick")
1512 .route_id("test-route")
1513 .set_header("source", Value::String("timer".into()))
1514 .filter(|ex| ex.input.header("source").is_some())
1515 .to("log:info")
1516 .end_filter()
1517 .to("mock:result")
1518 .build()
1519 .unwrap();
1520
1521 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1525 }
1526
1527 #[test]
1528 fn test_loop_count_builder() {
1529 use camel_api::loop_eip::LoopMode;
1530
1531 let def = RouteBuilder::from("direct:start")
1532 .route_id("loop-test")
1533 .loop_count(3)
1534 .to("mock:inside")
1535 .end_loop()
1536 .to("mock:after")
1537 .build()
1538 .unwrap();
1539
1540 assert_eq!(def.steps().len(), 2);
1541 match &def.steps()[0] {
1542 BuilderStep::Loop { config, steps } => {
1543 assert!(matches!(config.mode, LoopMode::Count(3)));
1544 assert_eq!(steps.len(), 1);
1545 }
1546 other => panic!("Expected Loop, got {:?}", other),
1547 }
1548 assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1549 }
1550
1551 #[test]
1552 fn test_loop_while_builder() {
1553 use camel_api::loop_eip::LoopMode;
1554
1555 let def = RouteBuilder::from("direct:start")
1556 .route_id("loop-while-test")
1557 .loop_while(|_ex| true)
1558 .to("mock:retry")
1559 .end_loop()
1560 .build()
1561 .unwrap();
1562
1563 assert_eq!(def.steps().len(), 1);
1564 match &def.steps()[0] {
1565 BuilderStep::Loop { config, steps } => {
1566 assert!(matches!(config.mode, LoopMode::While(_)));
1567 assert_eq!(steps.len(), 1);
1568 }
1569 other => panic!("Expected Loop, got {:?}", other),
1570 }
1571 }
1572
1573 #[test]
1574 fn test_nested_loop_builder() {
1575 use camel_api::loop_eip::LoopMode;
1576
1577 let def = RouteBuilder::from("direct:start")
1578 .route_id("nested-loop-test")
1579 .loop_count(2)
1580 .to("mock:outer")
1581 .loop_count(3)
1582 .to("mock:inner")
1583 .end_loop()
1584 .end_loop()
1585 .to("mock:after")
1586 .build()
1587 .unwrap();
1588
1589 assert_eq!(def.steps().len(), 2);
1590 match &def.steps()[0] {
1591 BuilderStep::Loop { steps, .. } => {
1592 assert_eq!(steps.len(), 2);
1593 match &steps[1] {
1594 BuilderStep::Loop {
1595 config,
1596 steps: inner_steps,
1597 } => {
1598 assert!(matches!(config.mode, LoopMode::Count(3)));
1599 assert_eq!(inner_steps.len(), 1);
1600 }
1601 other => panic!("Expected nested Loop, got {:?}", other),
1602 }
1603 }
1604 other => panic!("Expected outer Loop, got {:?}", other),
1605 }
1606 }
1607
1608 #[tokio::test]
1613 async fn test_set_header_processor_works() {
1614 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1615 let exchange = Exchange::new(Message::new("test"));
1616 let result = svc.call(exchange).await.unwrap();
1617 assert_eq!(
1618 result.input.header("greeting"),
1619 Some(&Value::String("hello".into()))
1620 );
1621 }
1622
1623 #[tokio::test]
1624 async fn test_filter_processor_passes() {
1625 use camel_api::BoxProcessorExt;
1626 use camel_processor::FilterService;
1627
1628 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1629 let mut svc =
1630 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1631 let exchange = Exchange::new(Message::new("pass"));
1632 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1633 assert_eq!(result.input.body.as_text(), Some("pass"));
1634 }
1635
1636 #[tokio::test]
1637 async fn test_filter_processor_blocks() {
1638 use camel_api::BoxProcessorExt;
1639 use camel_processor::FilterService;
1640
1641 let sub = BoxProcessor::from_fn(|_ex| {
1642 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1643 });
1644 let mut svc =
1645 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1646 let exchange = Exchange::new(Message::new("reject"));
1647 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1648 assert_eq!(result.input.body.as_text(), Some("reject"));
1649 }
1650
1651 #[tokio::test]
1652 async fn test_map_body_processor_works() {
1653 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1654 if let Some(text) = body.as_text() {
1655 Body::Text(text.to_uppercase())
1656 } else {
1657 body
1658 }
1659 });
1660 let exchange = Exchange::new(Message::new("hello"));
1661 let result = mapper.oneshot(exchange).await.unwrap();
1662 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1663 }
1664
1665 #[tokio::test]
1666 async fn test_process_custom_processor_works() {
1667 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1668 ex.set_property("custom", Value::Bool(true));
1669 Ok(ex)
1670 });
1671 let exchange = Exchange::new(Message::default());
1672 let result = processor.oneshot(exchange).await.unwrap();
1673 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1674 }
1675
1676 #[tokio::test]
1681 async fn test_compose_pipeline_runs_steps_in_order() {
1682 use camel_core::route::compose_pipeline;
1683
1684 let processors = vec![
1685 BoxProcessor::new(SetHeader::new(
1686 IdentityProcessor,
1687 "step",
1688 Value::String("one".into()),
1689 )),
1690 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1691 if let Some(text) = body.as_text() {
1692 Body::Text(format!("{}-processed", text))
1693 } else {
1694 body
1695 }
1696 })),
1697 ];
1698
1699 let pipeline = compose_pipeline(processors);
1700 let exchange = Exchange::new(Message::new("hello"));
1701 let result = pipeline.oneshot(exchange).await.unwrap();
1702
1703 assert_eq!(
1704 result.input.header("step"),
1705 Some(&Value::String("one".into()))
1706 );
1707 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1708 }
1709
1710 #[tokio::test]
1711 async fn test_compose_pipeline_empty_is_identity() {
1712 use camel_core::route::compose_pipeline;
1713
1714 let pipeline = compose_pipeline(vec![]);
1715 let exchange = Exchange::new(Message::new("unchanged"));
1716 let result = pipeline.oneshot(exchange).await.unwrap();
1717 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1718 }
1719
1720 #[test]
1725 fn test_builder_circuit_breaker_sets_config() {
1726 use camel_api::circuit_breaker::CircuitBreakerConfig;
1727
1728 let config = CircuitBreakerConfig::new().failure_threshold(5);
1729 let definition = RouteBuilder::from("timer:tick")
1730 .route_id("test-route")
1731 .circuit_breaker(config)
1732 .build()
1733 .unwrap();
1734
1735 let cb = definition
1736 .circuit_breaker_config()
1737 .expect("circuit breaker should be set");
1738 assert_eq!(cb.failure_threshold, 5);
1739 }
1740
1741 #[test]
1742 fn test_builder_circuit_breaker_with_error_handler() {
1743 use camel_api::circuit_breaker::CircuitBreakerConfig;
1744 use camel_api::error_handler::ErrorHandlerConfig;
1745
1746 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1747 let eh_config = ErrorHandlerConfig::log_only();
1748
1749 let definition = RouteBuilder::from("timer:tick")
1750 .route_id("test-route")
1751 .to("log:info")
1752 .circuit_breaker(cb_config)
1753 .error_handler(eh_config)
1754 .build()
1755 .unwrap();
1756
1757 assert!(
1758 definition.circuit_breaker_config().is_some(),
1759 "circuit breaker config should be set"
1760 );
1761 }
1763
1764 #[test]
1765 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1766 let definition = RouteBuilder::from("direct:start")
1767 .route_id("test-route")
1768 .dead_letter_channel("log:dlc")
1769 .on_exception(|e| matches!(e, CamelError::Io(_)))
1770 .retry(3)
1771 .handled_by("log:io")
1772 .end_on_exception()
1773 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1774 .retry(1)
1775 .end_on_exception()
1776 .to("mock:out")
1777 .build()
1778 .expect("route should build");
1779
1780 let cfg = definition
1781 .error_handler_config()
1782 .expect("error handler should be set");
1783 assert_eq!(cfg.policies.len(), 2);
1784 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1785 assert_eq!(
1786 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1787 Some(3)
1788 );
1789 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1790 assert_eq!(
1791 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1792 Some(1)
1793 );
1794 }
1795
1796 #[test]
1797 fn test_builder_on_exception_mixed_mode_rejected() {
1798 let result = RouteBuilder::from("direct:start")
1799 .route_id("test-route")
1800 .error_handler(ErrorHandlerConfig::log_only())
1801 .on_exception(|_e| true)
1802 .end_on_exception()
1803 .to("mock:out")
1804 .build();
1805
1806 let err = result.err().expect("mixed mode should fail with an error");
1807
1808 assert!(
1809 format!("{err}").contains("mixed error handler modes"),
1810 "unexpected error: {err}"
1811 );
1812 }
1813
1814 #[test]
1815 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1816 let definition = RouteBuilder::from("direct:start")
1817 .route_id("test-route")
1818 .on_exception(|_e| true)
1819 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1820 .with_jitter(0.5)
1821 .end_on_exception()
1822 .to("mock:out")
1823 .build()
1824 .expect("route should build");
1825
1826 let cfg = definition
1827 .error_handler_config()
1828 .expect("error handler should be set");
1829 assert_eq!(cfg.policies.len(), 1);
1830 assert!(cfg.policies[0].retry.is_none());
1831 }
1832
1833 #[test]
1834 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1835 let definition = RouteBuilder::from("direct:start")
1836 .route_id("test-route")
1837 .dead_letter_channel("log:dlc")
1838 .to("mock:out")
1839 .build()
1840 .expect("route should build");
1841
1842 let cfg = definition
1843 .error_handler_config()
1844 .expect("error handler should be set");
1845 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1846 assert!(cfg.policies.is_empty());
1847 }
1848
1849 #[test]
1850 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1851 let definition = RouteBuilder::from("direct:start")
1852 .route_id("test-route")
1853 .dead_letter_channel("log:first")
1854 .on_exception(|e| matches!(e, CamelError::Io(_)))
1855 .retry(2)
1856 .end_on_exception()
1857 .dead_letter_channel("log:second")
1858 .to("mock:out")
1859 .build()
1860 .expect("route should build");
1861
1862 let cfg = definition
1863 .error_handler_config()
1864 .expect("error handler should be set");
1865 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1866 assert_eq!(cfg.policies.len(), 1);
1867 assert_eq!(
1868 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1869 Some(2)
1870 );
1871 }
1872
1873 #[test]
1874 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1875 let definition = RouteBuilder::from("direct:start")
1876 .route_id("test-route")
1877 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1878 .retry(1)
1879 .end_on_exception()
1880 .to("mock:out")
1881 .build()
1882 .expect("route should build");
1883
1884 let cfg = definition
1885 .error_handler_config()
1886 .expect("error handler should be set");
1887 assert!(cfg.dlc_uri.is_none());
1888 assert_eq!(cfg.policies.len(), 1);
1889 }
1890
1891 #[test]
1892 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1893 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1894 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1895
1896 let definition = RouteBuilder::from("direct:start")
1897 .route_id("test-route")
1898 .error_handler(first)
1899 .error_handler(second)
1900 .to("mock:out")
1901 .build()
1902 .expect("route should build");
1903
1904 let cfg = definition
1905 .error_handler_config()
1906 .expect("error handler should be set");
1907 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1908 }
1909
1910 #[test]
1913 fn test_split_builder_typestate() {
1914 use camel_api::splitter::{SplitterConfig, split_body_lines};
1915
1916 let definition = RouteBuilder::from("timer:test?period=1000")
1918 .route_id("test-route")
1919 .split(SplitterConfig::new(split_body_lines()))
1920 .to("mock:per-fragment")
1921 .end_split()
1922 .to("mock:final")
1923 .build()
1924 .unwrap();
1925
1926 assert_eq!(definition.steps().len(), 2);
1928 }
1929
1930 #[test]
1931 fn test_split_builder_steps_collected() {
1932 use camel_api::splitter::{SplitterConfig, split_body_lines};
1933
1934 let definition = RouteBuilder::from("timer:test?period=1000")
1935 .route_id("test-route")
1936 .split(SplitterConfig::new(split_body_lines()))
1937 .set_header("fragment", Value::String("yes".into()))
1938 .to("mock:per-fragment")
1939 .end_split()
1940 .build()
1941 .unwrap();
1942
1943 assert_eq!(definition.steps().len(), 1);
1945 match &definition.steps()[0] {
1946 BuilderStep::Split { steps, .. } => {
1947 assert_eq!(steps.len(), 2); }
1949 other => panic!("Expected Split, got {:?}", other),
1950 }
1951 }
1952
1953 #[test]
1954 fn test_split_builder_config_propagated() {
1955 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1956
1957 let definition = RouteBuilder::from("timer:test?period=1000")
1958 .route_id("test-route")
1959 .split(
1960 SplitterConfig::new(split_body_lines())
1961 .parallel(true)
1962 .parallel_limit(4)
1963 .aggregation(AggregationStrategy::CollectAll),
1964 )
1965 .to("mock:per-fragment")
1966 .end_split()
1967 .build()
1968 .unwrap();
1969
1970 match &definition.steps()[0] {
1971 BuilderStep::Split { config, .. } => {
1972 assert!(config.parallel);
1973 assert_eq!(config.parallel_limit, Some(4));
1974 assert!(matches!(
1975 config.aggregation,
1976 AggregationStrategy::CollectAll
1977 ));
1978 }
1979 other => panic!("Expected Split, got {:?}", other),
1980 }
1981 }
1982
1983 #[test]
1984 fn test_aggregate_builder_adds_step() {
1985 use camel_api::aggregator::AggregatorConfig;
1986 use camel_core::route::BuilderStep;
1987
1988 let definition = RouteBuilder::from("timer:tick")
1989 .route_id("test-route")
1990 .aggregate(
1991 AggregatorConfig::correlate_by("key")
1992 .complete_when_size(2)
1993 .build(),
1994 )
1995 .build()
1996 .unwrap();
1997
1998 assert_eq!(definition.steps().len(), 1);
1999 assert!(matches!(
2000 definition.steps()[0],
2001 BuilderStep::Aggregate { .. }
2002 ));
2003 }
2004
2005 #[test]
2006 fn test_aggregate_in_split_builder() {
2007 use camel_api::aggregator::AggregatorConfig;
2008 use camel_api::splitter::{SplitterConfig, split_body_lines};
2009 use camel_core::route::BuilderStep;
2010
2011 let definition = RouteBuilder::from("timer:tick")
2012 .route_id("test-route")
2013 .split(SplitterConfig::new(split_body_lines()))
2014 .aggregate(
2015 AggregatorConfig::correlate_by("key")
2016 .complete_when_size(1)
2017 .build(),
2018 )
2019 .end_split()
2020 .build()
2021 .unwrap();
2022
2023 assert_eq!(definition.steps().len(), 1);
2024 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2025 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2026 } else {
2027 panic!("expected Split step");
2028 }
2029 }
2030
2031 #[test]
2034 fn test_builder_set_body_static_adds_processor() {
2035 let definition = RouteBuilder::from("timer:tick")
2036 .route_id("test-route")
2037 .set_body("fixed")
2038 .build()
2039 .unwrap();
2040 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2041 }
2042
2043 #[test]
2044 fn test_builder_set_body_fn_adds_processor() {
2045 let definition = RouteBuilder::from("timer:tick")
2046 .route_id("test-route")
2047 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2048 .build()
2049 .unwrap();
2050 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2051 }
2052
2053 #[test]
2054 fn transform_alias_produces_same_as_set_body() {
2055 let route_transform = RouteBuilder::from("timer:tick")
2056 .route_id("test-route")
2057 .transform("hello")
2058 .build()
2059 .unwrap();
2060
2061 let route_set_body = RouteBuilder::from("timer:tick")
2062 .route_id("test-route")
2063 .set_body("hello")
2064 .build()
2065 .unwrap();
2066
2067 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2068 }
2069
2070 #[test]
2071 fn test_builder_set_header_fn_adds_processor() {
2072 let definition = RouteBuilder::from("timer:tick")
2073 .route_id("test-route")
2074 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2075 .build()
2076 .unwrap();
2077 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2078 }
2079
2080 #[tokio::test]
2081 async fn test_set_body_static_processor_works() {
2082 use camel_core::route::compose_pipeline;
2083 let def = RouteBuilder::from("t:t")
2084 .route_id("test-route")
2085 .set_body("replaced")
2086 .build()
2087 .unwrap();
2088 let pipeline = compose_pipeline(
2089 def.steps()
2090 .iter()
2091 .filter_map(|s| {
2092 if let BuilderStep::Processor(p) = s {
2093 Some(p.clone())
2094 } else {
2095 None
2096 }
2097 })
2098 .collect(),
2099 );
2100 let exchange = Exchange::new(Message::new("original"));
2101 let result = pipeline.oneshot(exchange).await.unwrap();
2102 assert_eq!(result.input.body.as_text(), Some("replaced"));
2103 }
2104
2105 #[tokio::test]
2106 async fn test_set_body_fn_processor_works() {
2107 use camel_core::route::compose_pipeline;
2108 let def = RouteBuilder::from("t:t")
2109 .route_id("test-route")
2110 .set_body_fn(|ex: &Exchange| {
2111 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2112 })
2113 .build()
2114 .unwrap();
2115 let pipeline = compose_pipeline(
2116 def.steps()
2117 .iter()
2118 .filter_map(|s| {
2119 if let BuilderStep::Processor(p) = s {
2120 Some(p.clone())
2121 } else {
2122 None
2123 }
2124 })
2125 .collect(),
2126 );
2127 let exchange = Exchange::new(Message::new("hello"));
2128 let result = pipeline.oneshot(exchange).await.unwrap();
2129 assert_eq!(result.input.body.as_text(), Some("HELLO"));
2130 }
2131
2132 #[tokio::test]
2133 async fn test_set_header_fn_processor_works() {
2134 use camel_core::route::compose_pipeline;
2135 let def = RouteBuilder::from("t:t")
2136 .route_id("test-route")
2137 .set_header_fn("echo", |ex: &Exchange| {
2138 ex.input
2139 .body
2140 .as_text()
2141 .map(|t| Value::String(t.into()))
2142 .unwrap_or(Value::Null)
2143 })
2144 .build()
2145 .unwrap();
2146 let pipeline = compose_pipeline(
2147 def.steps()
2148 .iter()
2149 .filter_map(|s| {
2150 if let BuilderStep::Processor(p) = s {
2151 Some(p.clone())
2152 } else {
2153 None
2154 }
2155 })
2156 .collect(),
2157 );
2158 let exchange = Exchange::new(Message::new("ping"));
2159 let result = pipeline.oneshot(exchange).await.unwrap();
2160 assert_eq!(
2161 result.input.header("echo"),
2162 Some(&Value::String("ping".into()))
2163 );
2164 }
2165
2166 #[test]
2169 fn test_filter_builder_typestate() {
2170 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2171 .route_id("test-route")
2172 .filter(|_ex| true)
2173 .to("mock:inner")
2174 .end_filter()
2175 .to("mock:outer")
2176 .build();
2177 assert!(result.is_ok());
2178 }
2179
2180 #[test]
2181 fn test_filter_builder_steps_collected() {
2182 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2183 .route_id("test-route")
2184 .filter(|_ex| true)
2185 .to("mock:inner")
2186 .end_filter()
2187 .build()
2188 .unwrap();
2189
2190 assert_eq!(definition.steps().len(), 1);
2191 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2192 }
2193
2194 #[test]
2195 fn test_wire_tap_builder_adds_step() {
2196 let definition = RouteBuilder::from("timer:tick")
2197 .route_id("test-route")
2198 .wire_tap("mock:tap")
2199 .to("mock:result")
2200 .build()
2201 .unwrap();
2202
2203 assert_eq!(definition.steps().len(), 2);
2204 assert!(
2205 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2206 );
2207 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2208 }
2209
2210 #[test]
2213 fn test_multicast_builder_typestate() {
2214 let definition = RouteBuilder::from("timer:tick")
2215 .route_id("test-route")
2216 .multicast()
2217 .to("direct:a")
2218 .to("direct:b")
2219 .end_multicast()
2220 .to("mock:result")
2221 .build()
2222 .unwrap();
2223
2224 assert_eq!(definition.steps().len(), 2); }
2226
2227 #[test]
2228 fn test_multicast_builder_steps_collected() {
2229 let definition = RouteBuilder::from("timer:tick")
2230 .route_id("test-route")
2231 .multicast()
2232 .to("direct:a")
2233 .to("direct:b")
2234 .end_multicast()
2235 .build()
2236 .unwrap();
2237
2238 match &definition.steps()[0] {
2239 BuilderStep::Multicast { steps, .. } => {
2240 assert_eq!(steps.len(), 2);
2241 }
2242 other => panic!("Expected Multicast, got {:?}", other),
2243 }
2244 }
2245
2246 #[test]
2249 fn test_builder_concurrent_sets_concurrency() {
2250 use camel_component_api::ConcurrencyModel;
2251
2252 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2253 .route_id("test-route")
2254 .concurrent(16)
2255 .to("log:info")
2256 .build()
2257 .unwrap();
2258
2259 assert_eq!(
2260 definition.concurrency_override(),
2261 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2262 );
2263 }
2264
2265 #[test]
2266 fn test_builder_concurrent_zero_means_unbounded() {
2267 use camel_component_api::ConcurrencyModel;
2268
2269 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2270 .route_id("test-route")
2271 .concurrent(0)
2272 .to("log:info")
2273 .build()
2274 .unwrap();
2275
2276 assert_eq!(
2277 definition.concurrency_override(),
2278 Some(&ConcurrencyModel::Concurrent { max: None })
2279 );
2280 }
2281
2282 #[test]
2283 fn test_builder_sequential_sets_concurrency() {
2284 use camel_component_api::ConcurrencyModel;
2285
2286 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2287 .route_id("test-route")
2288 .sequential()
2289 .to("log:info")
2290 .build()
2291 .unwrap();
2292
2293 assert_eq!(
2294 definition.concurrency_override(),
2295 Some(&ConcurrencyModel::Sequential)
2296 );
2297 }
2298
2299 #[test]
2300 fn test_builder_default_concurrency_is_none() {
2301 let definition = RouteBuilder::from("timer:tick")
2302 .route_id("test-route")
2303 .to("log:info")
2304 .build()
2305 .unwrap();
2306
2307 assert_eq!(definition.concurrency_override(), None);
2308 }
2309
2310 #[test]
2313 fn test_builder_route_id_sets_id() {
2314 let definition = RouteBuilder::from("timer:tick")
2315 .route_id("my-route")
2316 .build()
2317 .unwrap();
2318
2319 assert_eq!(definition.route_id(), "my-route");
2320 }
2321
2322 #[test]
2323 fn test_build_without_route_id_fails() {
2324 let result = RouteBuilder::from("timer:tick?period=1000")
2325 .to("log:info")
2326 .build();
2327 let err = match result {
2328 Err(e) => e.to_string(),
2329 Ok(_) => panic!("build() should fail without route_id"),
2330 };
2331 assert!(
2332 err.contains("route_id"),
2333 "error should mention route_id, got: {}",
2334 err
2335 );
2336 }
2337
2338 #[test]
2339 fn test_builder_auto_startup_false() {
2340 let definition = RouteBuilder::from("timer:tick")
2341 .route_id("test-route")
2342 .auto_startup(false)
2343 .build()
2344 .unwrap();
2345
2346 assert!(!definition.auto_startup());
2347 }
2348
2349 #[test]
2350 fn test_builder_startup_order_custom() {
2351 let definition = RouteBuilder::from("timer:tick")
2352 .route_id("test-route")
2353 .startup_order(50)
2354 .build()
2355 .unwrap();
2356
2357 assert_eq!(definition.startup_order(), 50);
2358 }
2359
2360 #[test]
2361 fn test_builder_defaults() {
2362 let definition = RouteBuilder::from("timer:tick")
2363 .route_id("test-route")
2364 .build()
2365 .unwrap();
2366
2367 assert_eq!(definition.route_id(), "test-route");
2368 assert!(definition.auto_startup());
2369 assert_eq!(definition.startup_order(), 1000);
2370 }
2371
2372 #[test]
2375 fn test_choice_builder_single_when() {
2376 let definition = RouteBuilder::from("timer:tick")
2377 .route_id("test-route")
2378 .choice()
2379 .when(|ex: &Exchange| ex.input.header("type").is_some())
2380 .to("mock:typed")
2381 .end_when()
2382 .end_choice()
2383 .build()
2384 .unwrap();
2385 assert_eq!(definition.steps().len(), 1);
2386 assert!(
2387 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2388 if whens.len() == 1 && otherwise.is_none())
2389 );
2390 }
2391
2392 #[test]
2393 fn test_choice_builder_when_otherwise() {
2394 let definition = RouteBuilder::from("timer:tick")
2395 .route_id("test-route")
2396 .choice()
2397 .when(|ex: &Exchange| ex.input.header("a").is_some())
2398 .to("mock:a")
2399 .end_when()
2400 .otherwise()
2401 .to("mock:fallback")
2402 .end_otherwise()
2403 .end_choice()
2404 .build()
2405 .unwrap();
2406 assert!(
2407 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2408 if whens.len() == 1 && otherwise.is_some())
2409 );
2410 }
2411
2412 #[test]
2413 fn test_choice_builder_multiple_whens() {
2414 let definition = RouteBuilder::from("timer:tick")
2415 .route_id("test-route")
2416 .choice()
2417 .when(|ex: &Exchange| ex.input.header("a").is_some())
2418 .to("mock:a")
2419 .end_when()
2420 .when(|ex: &Exchange| ex.input.header("b").is_some())
2421 .to("mock:b")
2422 .end_when()
2423 .end_choice()
2424 .build()
2425 .unwrap();
2426 assert!(
2427 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2428 if whens.len() == 2)
2429 );
2430 }
2431
2432 #[test]
2433 fn test_choice_step_after_choice() {
2434 let definition = RouteBuilder::from("timer:tick")
2436 .route_id("test-route")
2437 .choice()
2438 .when(|_ex: &Exchange| true)
2439 .to("mock:inner")
2440 .end_when()
2441 .end_choice()
2442 .to("mock:outer") .build()
2444 .unwrap();
2445 assert_eq!(definition.steps().len(), 2);
2446 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2447 }
2448
2449 #[test]
2452 fn test_throttle_builder_typestate() {
2453 let definition = RouteBuilder::from("timer:tick")
2454 .route_id("test-route")
2455 .throttle(10, std::time::Duration::from_secs(1))
2456 .to("mock:result")
2457 .end_throttle()
2458 .build()
2459 .unwrap();
2460
2461 assert_eq!(definition.steps().len(), 1);
2462 assert!(matches!(
2463 &definition.steps()[0],
2464 BuilderStep::Throttle { .. }
2465 ));
2466 }
2467
2468 #[test]
2469 fn test_throttle_builder_with_strategy() {
2470 let definition = RouteBuilder::from("timer:tick")
2471 .route_id("test-route")
2472 .throttle(10, std::time::Duration::from_secs(1))
2473 .strategy(ThrottleStrategy::Reject)
2474 .to("mock:result")
2475 .end_throttle()
2476 .build()
2477 .unwrap();
2478
2479 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2480 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2481 } else {
2482 panic!("Expected Throttle step");
2483 }
2484 }
2485
2486 #[test]
2487 fn test_throttle_builder_steps_collected() {
2488 let definition = RouteBuilder::from("timer:tick")
2489 .route_id("test-route")
2490 .throttle(5, std::time::Duration::from_secs(1))
2491 .set_header("throttled", Value::Bool(true))
2492 .to("mock:throttled")
2493 .end_throttle()
2494 .build()
2495 .unwrap();
2496
2497 match &definition.steps()[0] {
2498 BuilderStep::Throttle { steps, .. } => {
2499 assert_eq!(steps.len(), 2); }
2501 other => panic!("Expected Throttle, got {:?}", other),
2502 }
2503 }
2504
2505 #[test]
2506 fn test_throttle_step_after_throttle() {
2507 let definition = RouteBuilder::from("timer:tick")
2509 .route_id("test-route")
2510 .throttle(10, std::time::Duration::from_secs(1))
2511 .to("mock:inner")
2512 .end_throttle()
2513 .to("mock:outer")
2514 .build()
2515 .unwrap();
2516
2517 assert_eq!(definition.steps().len(), 2);
2518 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2519 }
2520
2521 #[test]
2524 fn test_load_balance_builder_typestate() {
2525 let definition = RouteBuilder::from("timer:tick")
2526 .route_id("test-route")
2527 .load_balance()
2528 .round_robin()
2529 .to("mock:a")
2530 .to("mock:b")
2531 .end_load_balance()
2532 .build()
2533 .unwrap();
2534
2535 assert_eq!(definition.steps().len(), 1);
2536 assert!(matches!(
2537 &definition.steps()[0],
2538 BuilderStep::LoadBalance { .. }
2539 ));
2540 }
2541
2542 #[test]
2543 fn test_load_balance_builder_with_strategy() {
2544 let definition = RouteBuilder::from("timer:tick")
2545 .route_id("test-route")
2546 .load_balance()
2547 .random()
2548 .to("mock:result")
2549 .end_load_balance()
2550 .build()
2551 .unwrap();
2552
2553 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2554 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2555 } else {
2556 panic!("Expected LoadBalance step");
2557 }
2558 }
2559
2560 #[test]
2561 fn test_load_balance_builder_steps_collected() {
2562 let definition = RouteBuilder::from("timer:tick")
2563 .route_id("test-route")
2564 .load_balance()
2565 .set_header("lb", Value::Bool(true))
2566 .to("mock:a")
2567 .end_load_balance()
2568 .build()
2569 .unwrap();
2570
2571 match &definition.steps()[0] {
2572 BuilderStep::LoadBalance { steps, .. } => {
2573 assert_eq!(steps.len(), 2); }
2575 other => panic!("Expected LoadBalance, got {:?}", other),
2576 }
2577 }
2578
2579 #[test]
2580 fn test_load_balance_step_after_load_balance() {
2581 let definition = RouteBuilder::from("timer:tick")
2583 .route_id("test-route")
2584 .load_balance()
2585 .to("mock:inner")
2586 .end_load_balance()
2587 .to("mock:outer")
2588 .build()
2589 .unwrap();
2590
2591 assert_eq!(definition.steps().len(), 2);
2592 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2593 }
2594
2595 #[test]
2598 fn test_dynamic_router_builder() {
2599 let definition = RouteBuilder::from("timer:tick")
2600 .route_id("test-route")
2601 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2602 .build()
2603 .unwrap();
2604
2605 assert_eq!(definition.steps().len(), 1);
2606 assert!(matches!(
2607 &definition.steps()[0],
2608 BuilderStep::DynamicRouter { .. }
2609 ));
2610 }
2611
2612 #[test]
2613 fn test_dynamic_router_builder_with_config() {
2614 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2615 .max_iterations(100)
2616 .cache_size(500);
2617
2618 let definition = RouteBuilder::from("timer:tick")
2619 .route_id("test-route")
2620 .dynamic_router_with_config(config)
2621 .build()
2622 .unwrap();
2623
2624 assert_eq!(definition.steps().len(), 1);
2625 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2626 assert_eq!(config.max_iterations, 100);
2627 assert_eq!(config.cache_size, 500);
2628 } else {
2629 panic!("Expected DynamicRouter step");
2630 }
2631 }
2632
2633 #[test]
2634 fn test_dynamic_router_step_after_router() {
2635 let definition = RouteBuilder::from("timer:tick")
2637 .route_id("test-route")
2638 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2639 .to("mock:outer")
2640 .build()
2641 .unwrap();
2642
2643 assert_eq!(definition.steps().len(), 2);
2644 assert!(matches!(
2645 &definition.steps()[0],
2646 BuilderStep::DynamicRouter { .. }
2647 ));
2648 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2649 }
2650
2651 #[test]
2652 fn routing_slip_builder_creates_step() {
2653 use camel_api::RoutingSlipExpression;
2654
2655 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2656
2657 let route = RouteBuilder::from("direct:start")
2658 .route_id("routing-slip-test")
2659 .routing_slip(expression)
2660 .build()
2661 .unwrap();
2662
2663 assert!(
2664 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2665 "Expected RoutingSlip step"
2666 );
2667 }
2668
2669 #[test]
2670 fn routing_slip_with_config_builder_creates_step() {
2671 use camel_api::RoutingSlipConfig;
2672
2673 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2674 .uri_delimiter("|")
2675 .cache_size(50)
2676 .ignore_invalid_endpoints(true);
2677
2678 let route = RouteBuilder::from("direct:start")
2679 .route_id("routing-slip-config-test")
2680 .routing_slip_with_config(config)
2681 .build()
2682 .unwrap();
2683
2684 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2685 assert_eq!(config.uri_delimiter, "|");
2686 assert_eq!(config.cache_size, 50);
2687 assert!(config.ignore_invalid_endpoints);
2688 } else {
2689 panic!("Expected RoutingSlip step");
2690 }
2691 }
2692
2693 #[test]
2694 fn test_builder_marshal_adds_processor_step() {
2695 let definition = RouteBuilder::from("timer:tick")
2696 .route_id("test-route")
2697 .marshal("json")
2698 .build()
2699 .unwrap();
2700 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2701 }
2702
2703 #[test]
2704 fn test_builder_unmarshal_adds_processor_step() {
2705 let definition = RouteBuilder::from("timer:tick")
2706 .route_id("test-route")
2707 .unmarshal("json")
2708 .build()
2709 .unwrap();
2710 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2711 }
2712
2713 #[test]
2714 fn test_builder_stream_cache_adds_processor_step() {
2715 let definition = RouteBuilder::from("timer:tick")
2716 .route_id("test-route")
2717 .stream_cache(1024)
2718 .build()
2719 .unwrap();
2720 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2721 }
2722
2723 #[test]
2724 fn validate_adds_to_step_with_validator_uri() {
2725 let def = RouteBuilder::from("direct:in")
2726 .route_id("test")
2727 .validate("schemas/order.xsd")
2728 .build()
2729 .unwrap();
2730 let steps = def.steps();
2731 assert_eq!(steps.len(), 1);
2732 assert!(
2733 matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2734 "got: {:?}",
2735 steps[0]
2736 );
2737 }
2738
2739 #[test]
2740 #[should_panic(expected = "unknown data format: 'csv'")]
2741 fn test_builder_marshal_panics_on_unknown_format() {
2742 let _ = RouteBuilder::from("timer:tick")
2743 .route_id("test-route")
2744 .marshal("csv")
2745 .build();
2746 }
2747
2748 #[test]
2749 #[should_panic(expected = "unknown data format: 'csv'")]
2750 fn test_builder_unmarshal_panics_on_unknown_format() {
2751 let _ = RouteBuilder::from("timer:tick")
2752 .route_id("test-route")
2753 .unmarshal("csv")
2754 .build();
2755 }
2756
2757 #[test]
2758 fn test_builder_recipient_list_creates_step() {
2759 let route = RouteBuilder::from("direct:start")
2760 .route_id("recipient-list-test")
2761 .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2762 .build()
2763 .unwrap();
2764
2765 assert!(matches!(
2766 &route.steps()[0],
2767 BuilderStep::RecipientList { .. }
2768 ));
2769 }
2770
2771 #[test]
2772 fn test_builder_recipient_list_with_config_creates_step() {
2773 let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2774
2775 let route = RouteBuilder::from("direct:start")
2776 .route_id("recipient-list-config-test")
2777 .recipient_list_with_config(config)
2778 .build()
2779 .unwrap();
2780
2781 assert!(matches!(
2782 &route.steps()[0],
2783 BuilderStep::RecipientList { .. }
2784 ));
2785 }
2786
2787 #[test]
2788 fn test_builder_script_adds_script_step() {
2789 let route = RouteBuilder::from("direct:start")
2790 .route_id("script-test")
2791 .script("rhai", "headers[\"x\"] = \"y\"")
2792 .build()
2793 .unwrap();
2794
2795 assert!(matches!(
2796 &route.steps()[0],
2797 BuilderStep::Script { language, script }
2798 if language == "rhai" && script == "headers[\"x\"] = \"y\""
2799 ));
2800 }
2801
2802 #[test]
2803 fn test_builder_delay_and_delay_with_header_add_steps() {
2804 let route = RouteBuilder::from("direct:start")
2805 .route_id("delay-test")
2806 .delay(Duration::from_millis(250))
2807 .delay_with_header(Duration::from_millis(500), "x-delay")
2808 .build()
2809 .unwrap();
2810
2811 assert_eq!(route.steps().len(), 2);
2812 assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
2813 assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
2814 }
2815
2816 #[test]
2817 fn test_builder_log_and_stop_add_steps_in_order() {
2818 let route = RouteBuilder::from("direct:start")
2819 .route_id("log-stop-test")
2820 .log("hello", LogLevel::Info)
2821 .stop()
2822 .to("mock:after")
2823 .build()
2824 .unwrap();
2825
2826 assert_eq!(route.steps().len(), 3);
2827 assert!(matches!(
2828 &route.steps()[0],
2829 BuilderStep::Log { message, .. } if message == "hello"
2830 ));
2831 assert!(matches!(&route.steps()[1], BuilderStep::Stop));
2832 assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
2833 }
2834
2835 #[test]
2836 fn test_builder_stream_cache_default_adds_processor_step() {
2837 let route = RouteBuilder::from("direct:start")
2838 .route_id("stream-cache-default-test")
2839 .stream_cache_default()
2840 .build()
2841 .unwrap();
2842
2843 assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
2844 }
2845
2846 #[test]
2847 fn test_validate_preserves_existing_validator_prefix() {
2848 let route = RouteBuilder::from("direct:in")
2849 .route_id("validate-prefix-test")
2850 .validate("validator:schemas/order.xsd")
2851 .build()
2852 .unwrap();
2853
2854 assert!(matches!(
2855 &route.steps()[0],
2856 BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
2857 ));
2858 }
2859
2860 #[test]
2861 fn test_load_balance_builder_weighted_failover_parallel_config() {
2862 let route = RouteBuilder::from("direct:start")
2863 .route_id("lb-weighted-failover-parallel")
2864 .load_balance()
2865 .weighted(vec![
2866 ("direct:a".to_string(), 3),
2867 ("direct:b".to_string(), 1),
2868 ])
2869 .failover()
2870 .parallel(true)
2871 .to("mock:result")
2872 .end_load_balance()
2873 .build()
2874 .unwrap();
2875
2876 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
2877 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
2878 assert!(config.parallel);
2879 } else {
2880 panic!("Expected LoadBalance step");
2881 }
2882 }
2883
2884 #[test]
2885 fn test_multicast_builder_all_config_setters() {
2886 let route = RouteBuilder::from("direct:start")
2887 .route_id("multicast-config-test")
2888 .multicast()
2889 .parallel(true)
2890 .parallel_limit(4)
2891 .stop_on_exception(true)
2892 .timeout(Duration::from_millis(300))
2893 .aggregation(MulticastStrategy::Original)
2894 .to("mock:a")
2895 .end_multicast()
2896 .build()
2897 .unwrap();
2898
2899 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
2900 assert!(config.parallel);
2901 assert_eq!(config.parallel_limit, Some(4));
2902 assert!(config.stop_on_exception);
2903 assert_eq!(config.timeout, Some(Duration::from_millis(300)));
2904 assert!(matches!(config.aggregation, MulticastStrategy::Original));
2905 } else {
2906 panic!("Expected Multicast step");
2907 }
2908 }
2909
2910 #[test]
2911 fn test_build_canonical_rejects_unsupported_processor_step() {
2912 let err = RouteBuilder::from("direct:start")
2913 .route_id("canonical-reject")
2914 .set_header("k", Value::String("v".into()))
2915 .build_canonical()
2916 .unwrap_err();
2917
2918 assert!(format!("{err}").contains("does not support step `processor`"));
2919 }
2920}