1use camel_api::DelayConfig;
2use camel_api::aggregator::{
3 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
4};
5use camel_api::body::Body;
6use camel_api::body_converter::BodyType;
7use camel_api::circuit_breaker::CircuitBreakerConfig;
8use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
9use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
10use camel_api::load_balancer::LoadBalancerConfig;
11use camel_api::loop_eip::{LoopConfig, LoopMode};
12use camel_api::multicast::{MulticastConfig, MulticastStrategy};
13use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
14use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
15use camel_api::splitter::SplitterConfig;
16use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
17use camel_api::{
18 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
19 ProcessorFn, Value,
20 runtime::{
21 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
22 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
23 CanonicalWhenSpec,
24 },
25};
26use camel_component_api::ConcurrencyModel;
27use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
28use camel_processor::{
29 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
30 StreamCacheService, UnmarshalService, builtin_data_format,
31};
32
33pub trait StepAccumulator: Sized {
39 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
40
41 fn to(mut self, endpoint: impl Into<String>) -> Self {
42 self.steps_mut().push(BuilderStep::To(endpoint.into()));
43 self
44 }
45
46 fn process<F, Fut>(mut self, f: F) -> Self
47 where
48 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
49 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
50 {
51 let svc = ProcessorFn::new(f);
52 self.steps_mut()
53 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
54 self
55 }
56
57 fn process_fn(mut self, processor: BoxProcessor) -> Self {
58 self.steps_mut().push(BuilderStep::Processor(processor));
59 self
60 }
61
62 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
63 let svc = SetHeader::new(IdentityProcessor, key, value);
64 self.steps_mut()
65 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
66 self
67 }
68
69 fn map_body<F>(mut self, mapper: F) -> Self
70 where
71 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
72 {
73 let svc = MapBody::new(IdentityProcessor, mapper);
74 self.steps_mut()
75 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
76 self
77 }
78
79 fn set_body<B>(mut self, body: B) -> Self
80 where
81 B: Into<Body> + Clone + Send + Sync + 'static,
82 {
83 let body: Body = body.into();
84 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
85 self.steps_mut()
86 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
87 self
88 }
89
90 fn transform<B>(self, body: B) -> Self
95 where
96 B: Into<Body> + Clone + Send + Sync + 'static,
97 {
98 self.set_body(body)
99 }
100
101 fn set_body_fn<F>(mut self, expr: F) -> Self
102 where
103 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
104 {
105 let svc = SetBody::new(IdentityProcessor, expr);
106 self.steps_mut()
107 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
108 self
109 }
110
111 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
112 where
113 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
114 {
115 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
116 self.steps_mut()
117 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
118 self
119 }
120
121 fn aggregate(mut self, config: AggregatorConfig) -> Self {
122 self.steps_mut().push(BuilderStep::Aggregate { config });
123 self
124 }
125
126 fn stop(mut self) -> Self {
132 self.steps_mut().push(BuilderStep::Stop);
133 self
134 }
135
136 fn delay(mut self, duration: std::time::Duration) -> Self {
137 self.steps_mut().push(BuilderStep::Delay {
138 config: DelayConfig::from_duration(duration),
139 });
140 self
141 }
142
143 fn delay_with_header(
144 mut self,
145 duration: std::time::Duration,
146 header: impl Into<String>,
147 ) -> Self {
148 self.steps_mut().push(BuilderStep::Delay {
149 config: DelayConfig::from_duration_with_header(duration, header),
150 });
151 self
152 }
153
154 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
158 self.steps_mut().push(BuilderStep::Log {
159 level,
160 message: message.into(),
161 });
162 self
163 }
164
165 fn convert_body_to(mut self, target: BodyType) -> Self {
177 let svc = ConvertBodyTo::new(IdentityProcessor, target);
178 self.steps_mut()
179 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
180 self
181 }
182
183 fn stream_cache(mut self, threshold: usize) -> Self {
184 let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
185 let svc = StreamCacheService::new(IdentityProcessor, config);
186 self.steps_mut()
187 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
188 self
189 }
190
191 fn stream_cache_default(self) -> Self {
195 self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
196 }
197
198 fn marshal(mut self, format: impl Into<String>) -> Self {
208 let name = format.into();
209 let df =
210 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
211 let svc = MarshalService::new(IdentityProcessor, df);
212 self.steps_mut()
213 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
214 self
215 }
216
217 fn unmarshal(mut self, format: impl Into<String>) -> Self {
227 let name = format.into();
228 let df =
229 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
230 let svc = UnmarshalService::new(IdentityProcessor, df);
231 self.steps_mut()
232 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
233 self
234 }
235
236 fn validate(mut self, schema_path: impl Into<String>) -> Self {
245 let path = schema_path.into();
246 let uri = if path.starts_with("validator:") {
247 path
248 } else {
249 format!("validator:{path}")
250 };
251 self.steps_mut().push(BuilderStep::To(uri));
252 self
253 }
254
255 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
266 self.steps_mut().push(BuilderStep::Script {
267 language: language.into(),
268 script: script.into(),
269 });
270 self
271 }
272
273 fn bean(mut self, name: impl Into<String>, method: impl Into<String>) -> Self {
274 self.steps_mut().push(BuilderStep::Bean {
275 name: name.into(),
276 method: method.into(),
277 });
278 self
279 }
280}
281
282pub struct RouteBuilder {
294 from_uri: String,
295 steps: Vec<BuilderStep>,
296 error_handler: Option<ErrorHandlerConfig>,
297 error_handler_mode: ErrorHandlerMode,
298 circuit_breaker_config: Option<CircuitBreakerConfig>,
299 concurrency: Option<ConcurrencyModel>,
300 route_id: Option<String>,
301 auto_startup: Option<bool>,
302 startup_order: Option<i32>,
303}
304
305#[derive(Default)]
306enum ErrorHandlerMode {
307 #[default]
308 None,
309 ExplicitConfig,
310 Shorthand {
311 dlc_uri: Option<String>,
312 specs: Vec<OnExceptionSpec>,
313 },
314 Mixed,
315}
316
317#[derive(Clone)]
318struct OnExceptionSpec {
319 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
320 retry: Option<RedeliveryPolicy>,
321 handled_by: Option<String>,
322}
323
324impl RouteBuilder {
325 pub fn from(endpoint: &str) -> Self {
327 Self {
328 from_uri: endpoint.to_string(),
329 steps: Vec::new(),
330 error_handler: None,
331 error_handler_mode: ErrorHandlerMode::None,
332 circuit_breaker_config: None,
333 concurrency: None,
334 route_id: None,
335 auto_startup: None,
336 startup_order: None,
337 }
338 }
339
340 pub fn filter<F>(self, predicate: F) -> FilterBuilder
344 where
345 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
346 {
347 FilterBuilder {
348 parent: self,
349 predicate: std::sync::Arc::new(predicate),
350 steps: vec![],
351 }
352 }
353
354 pub fn choice(self) -> ChoiceBuilder {
360 ChoiceBuilder {
361 parent: self,
362 whens: vec![],
363 _otherwise: None,
364 }
365 }
366
367 pub fn wire_tap(mut self, endpoint: &str) -> Self {
371 self.steps.push(BuilderStep::WireTap {
372 uri: endpoint.to_string(),
373 });
374 self
375 }
376
377 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
379 self.error_handler_mode = match self.error_handler_mode {
380 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
381 ErrorHandlerMode::ExplicitConfig
382 }
383 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
384 };
385 self.error_handler = Some(config);
386 self
387 }
388
389 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
391 let uri = uri.into();
392 self.error_handler_mode = match self.error_handler_mode {
393 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
394 dlc_uri: Some(uri),
395 specs: Vec::new(),
396 },
397 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
398 dlc_uri: Some(uri),
399 specs,
400 },
401 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
402 };
403 self
404 }
405
406 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
408 where
409 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
410 {
411 self.error_handler_mode = match self.error_handler_mode {
412 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
413 dlc_uri: None,
414 specs: Vec::new(),
415 },
416 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
417 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
418 };
419
420 OnExceptionBuilder {
421 parent: self,
422 policy: OnExceptionSpec {
423 matches: std::sync::Arc::new(matches),
424 retry: None,
425 handled_by: None,
426 },
427 }
428 }
429
430 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
432 self.circuit_breaker_config = Some(config);
433 self
434 }
435
436 pub fn concurrent(mut self, max: usize) -> Self {
450 let max = if max == 0 { None } else { Some(max) };
451 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
452 self
453 }
454
455 pub fn sequential(mut self) -> Self {
460 self.concurrency = Some(ConcurrencyModel::Sequential);
461 self
462 }
463
464 pub fn route_id(mut self, id: impl Into<String>) -> Self {
468 self.route_id = Some(id.into());
469 self
470 }
471
472 pub fn auto_startup(mut self, auto: bool) -> Self {
476 self.auto_startup = Some(auto);
477 self
478 }
479
480 pub fn startup_order(mut self, order: i32) -> Self {
484 self.startup_order = Some(order);
485 self
486 }
487
488 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
494 SplitBuilder {
495 parent: self,
496 config,
497 steps: Vec::new(),
498 }
499 }
500
501 pub fn multicast(self) -> MulticastBuilder {
507 MulticastBuilder {
508 parent: self,
509 steps: Vec::new(),
510 config: MulticastConfig::new(),
511 }
512 }
513
514 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
521 ThrottleBuilder {
522 parent: self,
523 config: ThrottlerConfig::new(max_requests, period),
524 steps: Vec::new(),
525 }
526 }
527
528 pub fn loop_count(self, count: usize) -> LoopBuilder {
530 LoopBuilder {
531 parent: self,
532 config: LoopConfig {
533 mode: LoopMode::Count(count),
534 },
535 steps: vec![],
536 }
537 }
538
539 pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
541 where
542 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
543 {
544 LoopBuilder {
545 parent: self,
546 config: LoopConfig {
547 mode: LoopMode::While(std::sync::Arc::new(predicate)),
548 },
549 steps: vec![],
550 }
551 }
552
553 pub fn load_balance(self) -> LoadBalancerBuilder {
559 LoadBalancerBuilder {
560 parent: self,
561 config: LoadBalancerConfig::round_robin(),
562 steps: Vec::new(),
563 }
564 }
565
566 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
582 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
583 }
584
585 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
589 self.steps.push(BuilderStep::DynamicRouter { config });
590 self
591 }
592
593 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
594 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
595 }
596
597 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
598 self.steps.push(BuilderStep::RoutingSlip { config });
599 self
600 }
601
602 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
603 self.recipient_list_with_config(RecipientListConfig::new(expression))
604 }
605
606 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
607 self.steps.push(BuilderStep::RecipientList { config });
608 self
609 }
610
611 pub fn build(self) -> Result<RouteDefinition, CamelError> {
613 if self.from_uri.is_empty() {
614 return Err(CamelError::RouteError(
615 "route must have a 'from' URI".to_string(),
616 ));
617 }
618 let route_id = self.route_id.ok_or_else(|| {
619 CamelError::RouteError(
620 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
621 .to_string(),
622 )
623 })?;
624 let resolved_error_handler = match self.error_handler_mode {
625 ErrorHandlerMode::None => self.error_handler,
626 ErrorHandlerMode::ExplicitConfig => self.error_handler,
627 ErrorHandlerMode::Mixed => {
628 return Err(CamelError::RouteError(
629 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
630 ));
631 }
632 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
633 let mut config = if let Some(uri) = dlc_uri {
634 ErrorHandlerConfig::dead_letter_channel(uri)
635 } else {
636 ErrorHandlerConfig::log_only()
637 };
638
639 for spec in specs {
640 let matcher = spec.matches.clone();
641 let mut builder = config.on_exception(move |e| matcher(e));
642
643 if let Some(retry) = spec.retry {
644 builder = builder.retry(retry.max_attempts).with_backoff(
645 retry.initial_delay,
646 retry.multiplier,
647 retry.max_delay,
648 );
649 if retry.jitter_factor > 0.0 {
650 builder = builder.with_jitter(retry.jitter_factor);
651 }
652 }
653
654 if let Some(uri) = spec.handled_by {
655 builder = builder.handled_by(uri);
656 }
657
658 config = builder.build();
659 }
660
661 Some(config)
662 }
663 };
664
665 let definition = RouteDefinition::new(self.from_uri, self.steps);
666 let definition = if let Some(eh) = resolved_error_handler {
667 definition.with_error_handler(eh)
668 } else {
669 definition
670 };
671 let definition = if let Some(cb) = self.circuit_breaker_config {
672 definition.with_circuit_breaker(cb)
673 } else {
674 definition
675 };
676 let definition = if let Some(concurrency) = self.concurrency {
677 definition.with_concurrency(concurrency)
678 } else {
679 definition
680 };
681 let definition = definition.with_route_id(route_id);
682 let definition = if let Some(auto) = self.auto_startup {
683 definition.with_auto_startup(auto)
684 } else {
685 definition
686 };
687 let definition = if let Some(order) = self.startup_order {
688 definition.with_startup_order(order)
689 } else {
690 definition
691 };
692 Ok(definition)
693 }
694
695 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
697 if self.from_uri.is_empty() {
698 return Err(CamelError::RouteError(
699 "route must have a 'from' URI".to_string(),
700 ));
701 }
702 let route_id = self.route_id.ok_or_else(|| {
703 CamelError::RouteError(
704 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
705 .to_string(),
706 )
707 })?;
708
709 let steps = canonicalize_steps(self.steps)?;
710 let circuit_breaker = self
711 .circuit_breaker_config
712 .map(canonicalize_circuit_breaker);
713
714 let spec = CanonicalRouteSpec {
715 route_id,
716 from: self.from_uri,
717 steps,
718 circuit_breaker,
719 version: camel_api::CANONICAL_CONTRACT_VERSION,
720 };
721 spec.validate_contract()?;
722 Ok(spec)
723 }
724}
725
726pub struct OnExceptionBuilder {
727 parent: RouteBuilder,
728 policy: OnExceptionSpec,
729}
730
731impl OnExceptionBuilder {
732 pub fn retry(mut self, max_attempts: u32) -> Self {
733 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
734 self
735 }
736
737 pub fn with_backoff(
738 mut self,
739 initial: std::time::Duration,
740 multiplier: f64,
741 max: std::time::Duration,
742 ) -> Self {
743 if let Some(ref mut retry) = self.policy.retry {
744 retry.initial_delay = initial;
745 retry.multiplier = multiplier;
746 retry.max_delay = max;
747 }
748 self
749 }
750
751 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
752 if let Some(ref mut retry) = self.policy.retry {
753 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
754 }
755 self
756 }
757
758 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
759 self.policy.handled_by = Some(uri.into());
760 self
761 }
762
763 pub fn end_on_exception(mut self) -> RouteBuilder {
764 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
765 specs.push(self.policy);
766 }
767 self.parent
768 }
769}
770
771fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
772 let mut canonical = Vec::with_capacity(steps.len());
773 for step in steps {
774 canonical.push(canonicalize_step(step)?);
775 }
776 Ok(canonical)
777}
778
779fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
780 match step {
781 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
782 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
783 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
784 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
785 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
786 delay_ms: config.delay_ms,
787 dynamic_header: config.dynamic_header,
788 }),
789 BuilderStep::DeclarativeScript { expression } => {
790 Ok(CanonicalStepSpec::Script { expression })
791 }
792 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
793 predicate,
794 steps: canonicalize_steps(steps)?,
795 }),
796 BuilderStep::DeclarativeChoice { whens, otherwise } => {
797 let mut canonical_whens = Vec::with_capacity(whens.len());
798 for DeclarativeWhenStep { predicate, steps } in whens {
799 canonical_whens.push(CanonicalWhenSpec {
800 predicate,
801 steps: canonicalize_steps(steps)?,
802 });
803 }
804 let otherwise = match otherwise {
805 Some(steps) => Some(canonicalize_steps(steps)?),
806 None => None,
807 };
808 Ok(CanonicalStepSpec::Choice {
809 whens: canonical_whens,
810 otherwise,
811 })
812 }
813 BuilderStep::DeclarativeSplit {
814 expression,
815 aggregation,
816 parallel,
817 parallel_limit,
818 stop_on_exception,
819 steps,
820 } => Ok(CanonicalStepSpec::Split {
821 expression: CanonicalSplitExpressionSpec::Language(expression),
822 aggregation: canonicalize_split_aggregation(aggregation)?,
823 parallel,
824 parallel_limit,
825 stop_on_exception,
826 steps: canonicalize_steps(steps)?,
827 }),
828 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
829 config: canonicalize_aggregate(config)?,
830 }),
831 other => {
832 let step_name = canonical_step_name(&other);
833 let detail = camel_api::canonical_contract_rejection_reason(step_name)
834 .unwrap_or("not included in canonical v1");
835 Err(CamelError::RouteError(format!(
836 "canonical v1 does not support step `{step_name}`: {detail}"
837 )))
838 }
839 }
840}
841
842fn canonicalize_split_aggregation(
843 strategy: camel_api::splitter::AggregationStrategy,
844) -> Result<CanonicalSplitAggregationSpec, CamelError> {
845 match strategy {
846 camel_api::splitter::AggregationStrategy::LastWins => {
847 Ok(CanonicalSplitAggregationSpec::LastWins)
848 }
849 camel_api::splitter::AggregationStrategy::CollectAll => {
850 Ok(CanonicalSplitAggregationSpec::CollectAll)
851 }
852 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
853 "canonical v1 does not support custom split aggregation".to_string(),
854 )),
855 camel_api::splitter::AggregationStrategy::Original => {
856 Ok(CanonicalSplitAggregationSpec::Original)
857 }
858 }
859}
860
861fn extract_completion_fields(
862 mode: &CompletionMode,
863) -> Result<(Option<usize>, Option<u64>), CamelError> {
864 match mode {
865 CompletionMode::Single(cond) => match cond {
866 CompletionCondition::Size(n) => Ok((Some(*n), None)),
867 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
868 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
869 "canonical v1 does not support aggregate predicate completion".to_string(),
870 )),
871 },
872 CompletionMode::Any(conds) => {
873 let mut size = None;
874 let mut timeout_ms = None;
875 for cond in conds {
876 match cond {
877 CompletionCondition::Size(n) => size = Some(*n),
878 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
879 CompletionCondition::Predicate(_) => {
880 return Err(CamelError::RouteError(
881 "canonical v1 does not support aggregate predicate completion"
882 .to_string(),
883 ));
884 }
885 }
886 }
887 Ok((size, timeout_ms))
888 }
889 }
890}
891
892fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
893 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
894
895 let header = match &config.correlation {
896 CorrelationStrategy::HeaderName(h) => h.clone(),
897 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
898 CorrelationStrategy::Fn(_) => {
899 return Err(CamelError::RouteError(
900 "canonical v1 does not support Fn correlation strategy".to_string(),
901 ));
902 }
903 };
904
905 let correlation_key = match &config.correlation {
906 CorrelationStrategy::HeaderName(_) => None,
907 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
908 CorrelationStrategy::Fn(_) => unreachable!(),
909 };
910
911 let strategy = match config.strategy {
912 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
913 AggregationStrategy::Custom(_) => {
914 return Err(CamelError::RouteError(
915 "canonical v1 does not support custom aggregate strategy".to_string(),
916 ));
917 }
918 };
919 let bucket_ttl_ms = config
920 .bucket_ttl
921 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
922
923 Ok(CanonicalAggregateSpec {
924 header,
925 completion_size,
926 completion_timeout_ms,
927 correlation_key,
928 force_completion_on_stop: if config.force_completion_on_stop {
929 Some(true)
930 } else {
931 None
932 },
933 discard_on_timeout: if config.discard_on_timeout {
934 Some(true)
935 } else {
936 None
937 },
938 strategy,
939 max_buckets: config.max_buckets,
940 bucket_ttl_ms,
941 })
942}
943
944fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
945 CanonicalCircuitBreakerSpec {
946 failure_threshold: config.failure_threshold,
947 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
948 }
949}
950
951fn canonical_step_name(step: &BuilderStep) -> &'static str {
952 match step {
953 BuilderStep::Processor(_) => "processor",
954 BuilderStep::To(_) => "to",
955 BuilderStep::Stop => "stop",
956 BuilderStep::Log { .. } => "log",
957 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
958 BuilderStep::DeclarativeSetBody { .. } => "set_body",
959 BuilderStep::DeclarativeFilter { .. } => "filter",
960 BuilderStep::DeclarativeChoice { .. } => "choice",
961 BuilderStep::DeclarativeScript { .. } => "script",
962 BuilderStep::DeclarativeSplit { .. } => "split",
963 BuilderStep::Split { .. } => "split",
964 BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
965 BuilderStep::Aggregate { .. } => "aggregate",
966 BuilderStep::Filter { .. } => "filter",
967 BuilderStep::Choice { .. } => "choice",
968 BuilderStep::WireTap { .. } => "wire_tap",
969 BuilderStep::Delay { .. } => "delay",
970 BuilderStep::Multicast { .. } => "multicast",
971 BuilderStep::DeclarativeLog { .. } => "log",
972 BuilderStep::Bean { .. } => "bean",
973 BuilderStep::Script { .. } => "script",
974 BuilderStep::Throttle { .. } => "throttle",
975 BuilderStep::LoadBalance { .. } => "load_balancer",
976 BuilderStep::DynamicRouter { .. } => "dynamic_router",
977 BuilderStep::RoutingSlip { .. } => "routing_slip",
978 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
979 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
980 BuilderStep::RecipientList { .. } => "recipient_list",
981 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
982 }
983}
984
985impl StepAccumulator for RouteBuilder {
986 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
987 &mut self.steps
988 }
989}
990
991pub struct SplitBuilder {
999 parent: RouteBuilder,
1000 config: SplitterConfig,
1001 steps: Vec<BuilderStep>,
1002}
1003
1004impl SplitBuilder {
1005 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
1007 where
1008 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1009 {
1010 FilterInSplitBuilder {
1011 parent: self,
1012 predicate: std::sync::Arc::new(predicate),
1013 steps: vec![],
1014 }
1015 }
1016
1017 pub fn end_split(mut self) -> RouteBuilder {
1020 let split_step = BuilderStep::Split {
1021 config: self.config,
1022 steps: self.steps,
1023 };
1024 self.parent.steps.push(split_step);
1025 self.parent
1026 }
1027}
1028
1029impl StepAccumulator for SplitBuilder {
1030 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1031 &mut self.steps
1032 }
1033}
1034
1035pub struct FilterBuilder {
1037 parent: RouteBuilder,
1038 predicate: FilterPredicate,
1039 steps: Vec<BuilderStep>,
1040}
1041
1042impl FilterBuilder {
1043 pub fn end_filter(mut self) -> RouteBuilder {
1046 let step = BuilderStep::Filter {
1047 predicate: self.predicate,
1048 steps: self.steps,
1049 };
1050 self.parent.steps.push(step);
1051 self.parent
1052 }
1053}
1054
1055impl StepAccumulator for FilterBuilder {
1056 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1057 &mut self.steps
1058 }
1059}
1060
1061pub struct FilterInSplitBuilder {
1063 parent: SplitBuilder,
1064 predicate: FilterPredicate,
1065 steps: Vec<BuilderStep>,
1066}
1067
1068impl FilterInSplitBuilder {
1069 pub fn end_filter(mut self) -> SplitBuilder {
1071 let step = BuilderStep::Filter {
1072 predicate: self.predicate,
1073 steps: self.steps,
1074 };
1075 self.parent.steps.push(step);
1076 self.parent
1077 }
1078}
1079
1080impl StepAccumulator for FilterInSplitBuilder {
1081 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1082 &mut self.steps
1083 }
1084}
1085
1086pub struct ChoiceBuilder {
1093 parent: RouteBuilder,
1094 whens: Vec<WhenStep>,
1095 _otherwise: Option<Vec<BuilderStep>>,
1096}
1097
1098impl ChoiceBuilder {
1099 pub fn when<F>(self, predicate: F) -> WhenBuilder
1102 where
1103 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1104 {
1105 WhenBuilder {
1106 parent: self,
1107 predicate: std::sync::Arc::new(predicate),
1108 steps: vec![],
1109 }
1110 }
1111
1112 pub fn otherwise(self) -> OtherwiseBuilder {
1116 OtherwiseBuilder {
1117 parent: self,
1118 steps: vec![],
1119 }
1120 }
1121
1122 pub fn end_choice(mut self) -> RouteBuilder {
1126 let step = BuilderStep::Choice {
1127 whens: self.whens,
1128 otherwise: self._otherwise,
1129 };
1130 self.parent.steps.push(step);
1131 self.parent
1132 }
1133}
1134
1135pub struct WhenBuilder {
1137 parent: ChoiceBuilder,
1138 predicate: camel_api::FilterPredicate,
1139 steps: Vec<BuilderStep>,
1140}
1141
1142impl WhenBuilder {
1143 pub fn end_when(mut self) -> ChoiceBuilder {
1146 self.parent.whens.push(WhenStep {
1147 predicate: self.predicate,
1148 steps: self.steps,
1149 });
1150 self.parent
1151 }
1152}
1153
1154impl StepAccumulator for WhenBuilder {
1155 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1156 &mut self.steps
1157 }
1158}
1159
1160pub struct OtherwiseBuilder {
1162 parent: ChoiceBuilder,
1163 steps: Vec<BuilderStep>,
1164}
1165
1166impl OtherwiseBuilder {
1167 pub fn end_otherwise(self) -> ChoiceBuilder {
1169 let OtherwiseBuilder { mut parent, steps } = self;
1170 parent._otherwise = Some(steps);
1171 parent
1172 }
1173}
1174
1175impl StepAccumulator for OtherwiseBuilder {
1176 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1177 &mut self.steps
1178 }
1179}
1180
1181pub struct MulticastBuilder {
1189 parent: RouteBuilder,
1190 steps: Vec<BuilderStep>,
1191 config: MulticastConfig,
1192}
1193
1194impl MulticastBuilder {
1195 pub fn parallel(mut self, parallel: bool) -> Self {
1196 self.config = self.config.parallel(parallel);
1197 self
1198 }
1199
1200 pub fn parallel_limit(mut self, limit: usize) -> Self {
1201 self.config = self.config.parallel_limit(limit);
1202 self
1203 }
1204
1205 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1206 self.config = self.config.stop_on_exception(stop);
1207 self
1208 }
1209
1210 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1211 self.config = self.config.timeout(duration);
1212 self
1213 }
1214
1215 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1216 self.config = self.config.aggregation(strategy);
1217 self
1218 }
1219
1220 pub fn end_multicast(mut self) -> RouteBuilder {
1221 let step = BuilderStep::Multicast {
1222 steps: self.steps,
1223 config: self.config,
1224 };
1225 self.parent.steps.push(step);
1226 self.parent
1227 }
1228}
1229
1230impl StepAccumulator for MulticastBuilder {
1231 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1232 &mut self.steps
1233 }
1234}
1235
1236pub struct ThrottleBuilder {
1244 parent: RouteBuilder,
1245 config: ThrottlerConfig,
1246 steps: Vec<BuilderStep>,
1247}
1248
1249impl ThrottleBuilder {
1250 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1256 self.config = self.config.strategy(strategy);
1257 self
1258 }
1259
1260 pub fn end_throttle(mut self) -> RouteBuilder {
1263 let step = BuilderStep::Throttle {
1264 config: self.config,
1265 steps: self.steps,
1266 };
1267 self.parent.steps.push(step);
1268 self.parent
1269 }
1270}
1271
1272impl StepAccumulator for ThrottleBuilder {
1273 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1274 &mut self.steps
1275 }
1276}
1277
1278pub struct LoopBuilder {
1280 parent: RouteBuilder,
1281 config: LoopConfig,
1282 steps: Vec<BuilderStep>,
1283}
1284
1285impl LoopBuilder {
1286 pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1287 LoopInLoopBuilder {
1288 parent: self,
1289 config: LoopConfig {
1290 mode: LoopMode::Count(count),
1291 },
1292 steps: vec![],
1293 }
1294 }
1295
1296 pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1297 where
1298 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1299 {
1300 LoopInLoopBuilder {
1301 parent: self,
1302 config: LoopConfig {
1303 mode: LoopMode::While(std::sync::Arc::new(predicate)),
1304 },
1305 steps: vec![],
1306 }
1307 }
1308
1309 pub fn end_loop(mut self) -> RouteBuilder {
1310 let step = BuilderStep::Loop {
1311 config: self.config,
1312 steps: self.steps,
1313 };
1314 self.parent.steps.push(step);
1315 self.parent
1316 }
1317}
1318
1319impl StepAccumulator for LoopBuilder {
1320 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1321 &mut self.steps
1322 }
1323}
1324
1325pub struct LoopInLoopBuilder {
1326 parent: LoopBuilder,
1327 config: LoopConfig,
1328 steps: Vec<BuilderStep>,
1329}
1330
1331impl LoopInLoopBuilder {
1332 pub fn end_loop(mut self) -> LoopBuilder {
1333 let step = BuilderStep::Loop {
1334 config: self.config,
1335 steps: self.steps,
1336 };
1337 self.parent.steps.push(step);
1338 self.parent
1339 }
1340}
1341
1342impl StepAccumulator for LoopInLoopBuilder {
1343 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1344 &mut self.steps
1345 }
1346}
1347
1348pub struct LoadBalancerBuilder {
1356 parent: RouteBuilder,
1357 config: LoadBalancerConfig,
1358 steps: Vec<BuilderStep>,
1359}
1360
1361impl LoadBalancerBuilder {
1362 pub fn round_robin(mut self) -> Self {
1364 self.config = LoadBalancerConfig::round_robin();
1365 self
1366 }
1367
1368 pub fn random(mut self) -> Self {
1370 self.config = LoadBalancerConfig::random();
1371 self
1372 }
1373
1374 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1379 self.config = LoadBalancerConfig::weighted(weights);
1380 self
1381 }
1382
1383 pub fn failover(mut self) -> Self {
1388 self.config = LoadBalancerConfig::failover();
1389 self
1390 }
1391
1392 pub fn parallel(mut self, parallel: bool) -> Self {
1397 self.config = self.config.parallel(parallel);
1398 self
1399 }
1400
1401 pub fn end_load_balance(mut self) -> RouteBuilder {
1404 let step = BuilderStep::LoadBalance {
1405 config: self.config,
1406 steps: self.steps,
1407 };
1408 self.parent.steps.push(step);
1409 self.parent
1410 }
1411}
1412
1413impl StepAccumulator for LoadBalancerBuilder {
1414 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1415 &mut self.steps
1416 }
1417}
1418
1419#[cfg(test)]
1424mod tests {
1425 use super::*;
1426 use camel_api::error_handler::ErrorHandlerConfig;
1427 use camel_api::load_balancer::LoadBalanceStrategy;
1428 use camel_api::{Exchange, Message};
1429 use camel_core::route::BuilderStep;
1430 use std::sync::Arc;
1431 use std::time::Duration;
1432 use tower::{Service, ServiceExt};
1433
1434 #[test]
1435 fn test_builder_from_creates_definition() {
1436 let definition = RouteBuilder::from("timer:tick")
1437 .route_id("test-route")
1438 .build()
1439 .unwrap();
1440 assert_eq!(definition.from_uri(), "timer:tick");
1441 }
1442
1443 #[test]
1444 fn test_builder_empty_from_uri_errors() {
1445 let result = RouteBuilder::from("").route_id("test-route").build();
1446 assert!(result.is_err());
1447 }
1448
1449 #[test]
1450 fn test_builder_to_adds_step() {
1451 let definition = RouteBuilder::from("timer:tick")
1452 .route_id("test-route")
1453 .to("log:info")
1454 .build()
1455 .unwrap();
1456
1457 assert_eq!(definition.from_uri(), "timer:tick");
1458 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1460 }
1461
1462 #[test]
1463 fn test_builder_filter_adds_filter_step() {
1464 let definition = RouteBuilder::from("timer:tick")
1465 .route_id("test-route")
1466 .filter(|_ex| true)
1467 .to("mock:result")
1468 .end_filter()
1469 .build()
1470 .unwrap();
1471
1472 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1473 }
1474
1475 #[test]
1476 fn test_builder_set_header_adds_processor_step() {
1477 let definition = RouteBuilder::from("timer:tick")
1478 .route_id("test-route")
1479 .set_header("key", Value::String("value".into()))
1480 .build()
1481 .unwrap();
1482
1483 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1484 }
1485
1486 #[test]
1487 fn test_builder_map_body_adds_processor_step() {
1488 let definition = RouteBuilder::from("timer:tick")
1489 .route_id("test-route")
1490 .map_body(|body| body)
1491 .build()
1492 .unwrap();
1493
1494 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1495 }
1496
1497 #[test]
1498 fn test_builder_process_adds_processor_step() {
1499 let definition = RouteBuilder::from("timer:tick")
1500 .route_id("test-route")
1501 .process(|ex| async move { Ok(ex) })
1502 .build()
1503 .unwrap();
1504
1505 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1506 }
1507
1508 #[test]
1509 fn test_builder_chain_multiple_steps() {
1510 let definition = RouteBuilder::from("timer:tick")
1511 .route_id("test-route")
1512 .set_header("source", Value::String("timer".into()))
1513 .filter(|ex| ex.input.header("source").is_some())
1514 .to("log:info")
1515 .end_filter()
1516 .to("mock:result")
1517 .build()
1518 .unwrap();
1519
1520 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1524 }
1525
1526 #[test]
1527 fn test_loop_count_builder() {
1528 use camel_api::loop_eip::LoopMode;
1529
1530 let def = RouteBuilder::from("direct:start")
1531 .route_id("loop-test")
1532 .loop_count(3)
1533 .to("mock:inside")
1534 .end_loop()
1535 .to("mock:after")
1536 .build()
1537 .unwrap();
1538
1539 assert_eq!(def.steps().len(), 2);
1540 match &def.steps()[0] {
1541 BuilderStep::Loop { config, steps } => {
1542 assert!(matches!(config.mode, LoopMode::Count(3)));
1543 assert_eq!(steps.len(), 1);
1544 }
1545 other => panic!("Expected Loop, got {:?}", other),
1546 }
1547 assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1548 }
1549
1550 #[test]
1551 fn test_loop_while_builder() {
1552 use camel_api::loop_eip::LoopMode;
1553
1554 let def = RouteBuilder::from("direct:start")
1555 .route_id("loop-while-test")
1556 .loop_while(|_ex| true)
1557 .to("mock:retry")
1558 .end_loop()
1559 .build()
1560 .unwrap();
1561
1562 assert_eq!(def.steps().len(), 1);
1563 match &def.steps()[0] {
1564 BuilderStep::Loop { config, steps } => {
1565 assert!(matches!(config.mode, LoopMode::While(_)));
1566 assert_eq!(steps.len(), 1);
1567 }
1568 other => panic!("Expected Loop, got {:?}", other),
1569 }
1570 }
1571
1572 #[test]
1573 fn test_nested_loop_builder() {
1574 use camel_api::loop_eip::LoopMode;
1575
1576 let def = RouteBuilder::from("direct:start")
1577 .route_id("nested-loop-test")
1578 .loop_count(2)
1579 .to("mock:outer")
1580 .loop_count(3)
1581 .to("mock:inner")
1582 .end_loop()
1583 .end_loop()
1584 .to("mock:after")
1585 .build()
1586 .unwrap();
1587
1588 assert_eq!(def.steps().len(), 2);
1589 match &def.steps()[0] {
1590 BuilderStep::Loop { steps, .. } => {
1591 assert_eq!(steps.len(), 2);
1592 match &steps[1] {
1593 BuilderStep::Loop {
1594 config,
1595 steps: inner_steps,
1596 } => {
1597 assert!(matches!(config.mode, LoopMode::Count(3)));
1598 assert_eq!(inner_steps.len(), 1);
1599 }
1600 other => panic!("Expected nested Loop, got {:?}", other),
1601 }
1602 }
1603 other => panic!("Expected outer Loop, got {:?}", other),
1604 }
1605 }
1606
1607 #[tokio::test]
1612 async fn test_set_header_processor_works() {
1613 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1614 let exchange = Exchange::new(Message::new("test"));
1615 let result = svc.call(exchange).await.unwrap();
1616 assert_eq!(
1617 result.input.header("greeting"),
1618 Some(&Value::String("hello".into()))
1619 );
1620 }
1621
1622 #[tokio::test]
1623 async fn test_filter_processor_passes() {
1624 use camel_api::BoxProcessorExt;
1625 use camel_processor::FilterService;
1626
1627 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1628 let mut svc =
1629 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1630 let exchange = Exchange::new(Message::new("pass"));
1631 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1632 assert_eq!(result.input.body.as_text(), Some("pass"));
1633 }
1634
1635 #[tokio::test]
1636 async fn test_filter_processor_blocks() {
1637 use camel_api::BoxProcessorExt;
1638 use camel_processor::FilterService;
1639
1640 let sub = BoxProcessor::from_fn(|_ex| {
1641 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1642 });
1643 let mut svc =
1644 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1645 let exchange = Exchange::new(Message::new("reject"));
1646 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1647 assert_eq!(result.input.body.as_text(), Some("reject"));
1648 }
1649
1650 #[tokio::test]
1651 async fn test_map_body_processor_works() {
1652 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1653 if let Some(text) = body.as_text() {
1654 Body::Text(text.to_uppercase())
1655 } else {
1656 body
1657 }
1658 });
1659 let exchange = Exchange::new(Message::new("hello"));
1660 let result = mapper.oneshot(exchange).await.unwrap();
1661 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1662 }
1663
1664 #[tokio::test]
1665 async fn test_process_custom_processor_works() {
1666 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1667 ex.set_property("custom", Value::Bool(true));
1668 Ok(ex)
1669 });
1670 let exchange = Exchange::new(Message::default());
1671 let result = processor.oneshot(exchange).await.unwrap();
1672 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1673 }
1674
1675 #[tokio::test]
1680 async fn test_compose_pipeline_runs_steps_in_order() {
1681 use camel_core::route::compose_pipeline;
1682
1683 let processors = vec![
1684 BoxProcessor::new(SetHeader::new(
1685 IdentityProcessor,
1686 "step",
1687 Value::String("one".into()),
1688 )),
1689 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1690 if let Some(text) = body.as_text() {
1691 Body::Text(format!("{}-processed", text))
1692 } else {
1693 body
1694 }
1695 })),
1696 ];
1697
1698 let pipeline = compose_pipeline(processors);
1699 let exchange = Exchange::new(Message::new("hello"));
1700 let result = pipeline.oneshot(exchange).await.unwrap();
1701
1702 assert_eq!(
1703 result.input.header("step"),
1704 Some(&Value::String("one".into()))
1705 );
1706 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1707 }
1708
1709 #[tokio::test]
1710 async fn test_compose_pipeline_empty_is_identity() {
1711 use camel_core::route::compose_pipeline;
1712
1713 let pipeline = compose_pipeline(vec![]);
1714 let exchange = Exchange::new(Message::new("unchanged"));
1715 let result = pipeline.oneshot(exchange).await.unwrap();
1716 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1717 }
1718
1719 #[test]
1724 fn test_builder_circuit_breaker_sets_config() {
1725 use camel_api::circuit_breaker::CircuitBreakerConfig;
1726
1727 let config = CircuitBreakerConfig::new().failure_threshold(5);
1728 let definition = RouteBuilder::from("timer:tick")
1729 .route_id("test-route")
1730 .circuit_breaker(config)
1731 .build()
1732 .unwrap();
1733
1734 let cb = definition
1735 .circuit_breaker_config()
1736 .expect("circuit breaker should be set");
1737 assert_eq!(cb.failure_threshold, 5);
1738 }
1739
1740 #[test]
1741 fn test_builder_circuit_breaker_with_error_handler() {
1742 use camel_api::circuit_breaker::CircuitBreakerConfig;
1743 use camel_api::error_handler::ErrorHandlerConfig;
1744
1745 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1746 let eh_config = ErrorHandlerConfig::log_only();
1747
1748 let definition = RouteBuilder::from("timer:tick")
1749 .route_id("test-route")
1750 .to("log:info")
1751 .circuit_breaker(cb_config)
1752 .error_handler(eh_config)
1753 .build()
1754 .unwrap();
1755
1756 assert!(
1757 definition.circuit_breaker_config().is_some(),
1758 "circuit breaker config should be set"
1759 );
1760 }
1762
1763 #[test]
1764 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1765 let definition = RouteBuilder::from("direct:start")
1766 .route_id("test-route")
1767 .dead_letter_channel("log:dlc")
1768 .on_exception(|e| matches!(e, CamelError::Io(_)))
1769 .retry(3)
1770 .handled_by("log:io")
1771 .end_on_exception()
1772 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1773 .retry(1)
1774 .end_on_exception()
1775 .to("mock:out")
1776 .build()
1777 .expect("route should build");
1778
1779 let cfg = definition
1780 .error_handler_config()
1781 .expect("error handler should be set");
1782 assert_eq!(cfg.policies.len(), 2);
1783 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1784 assert_eq!(
1785 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1786 Some(3)
1787 );
1788 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1789 assert_eq!(
1790 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1791 Some(1)
1792 );
1793 }
1794
1795 #[test]
1796 fn test_builder_on_exception_mixed_mode_rejected() {
1797 let result = RouteBuilder::from("direct:start")
1798 .route_id("test-route")
1799 .error_handler(ErrorHandlerConfig::log_only())
1800 .on_exception(|_e| true)
1801 .end_on_exception()
1802 .to("mock:out")
1803 .build();
1804
1805 let err = result.err().expect("mixed mode should fail with an error");
1806
1807 assert!(
1808 format!("{err}").contains("mixed error handler modes"),
1809 "unexpected error: {err}"
1810 );
1811 }
1812
1813 #[test]
1814 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1815 let definition = RouteBuilder::from("direct:start")
1816 .route_id("test-route")
1817 .on_exception(|_e| true)
1818 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1819 .with_jitter(0.5)
1820 .end_on_exception()
1821 .to("mock:out")
1822 .build()
1823 .expect("route should build");
1824
1825 let cfg = definition
1826 .error_handler_config()
1827 .expect("error handler should be set");
1828 assert_eq!(cfg.policies.len(), 1);
1829 assert!(cfg.policies[0].retry.is_none());
1830 }
1831
1832 #[test]
1833 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1834 let definition = RouteBuilder::from("direct:start")
1835 .route_id("test-route")
1836 .dead_letter_channel("log:dlc")
1837 .to("mock:out")
1838 .build()
1839 .expect("route should build");
1840
1841 let cfg = definition
1842 .error_handler_config()
1843 .expect("error handler should be set");
1844 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1845 assert!(cfg.policies.is_empty());
1846 }
1847
1848 #[test]
1849 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1850 let definition = RouteBuilder::from("direct:start")
1851 .route_id("test-route")
1852 .dead_letter_channel("log:first")
1853 .on_exception(|e| matches!(e, CamelError::Io(_)))
1854 .retry(2)
1855 .end_on_exception()
1856 .dead_letter_channel("log:second")
1857 .to("mock:out")
1858 .build()
1859 .expect("route should build");
1860
1861 let cfg = definition
1862 .error_handler_config()
1863 .expect("error handler should be set");
1864 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1865 assert_eq!(cfg.policies.len(), 1);
1866 assert_eq!(
1867 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1868 Some(2)
1869 );
1870 }
1871
1872 #[test]
1873 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1874 let definition = RouteBuilder::from("direct:start")
1875 .route_id("test-route")
1876 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1877 .retry(1)
1878 .end_on_exception()
1879 .to("mock:out")
1880 .build()
1881 .expect("route should build");
1882
1883 let cfg = definition
1884 .error_handler_config()
1885 .expect("error handler should be set");
1886 assert!(cfg.dlc_uri.is_none());
1887 assert_eq!(cfg.policies.len(), 1);
1888 }
1889
1890 #[test]
1891 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1892 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1893 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1894
1895 let definition = RouteBuilder::from("direct:start")
1896 .route_id("test-route")
1897 .error_handler(first)
1898 .error_handler(second)
1899 .to("mock:out")
1900 .build()
1901 .expect("route should build");
1902
1903 let cfg = definition
1904 .error_handler_config()
1905 .expect("error handler should be set");
1906 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1907 }
1908
1909 #[test]
1912 fn test_split_builder_typestate() {
1913 use camel_api::splitter::{SplitterConfig, split_body_lines};
1914
1915 let definition = RouteBuilder::from("timer:test?period=1000")
1917 .route_id("test-route")
1918 .split(SplitterConfig::new(split_body_lines()))
1919 .to("mock:per-fragment")
1920 .end_split()
1921 .to("mock:final")
1922 .build()
1923 .unwrap();
1924
1925 assert_eq!(definition.steps().len(), 2);
1927 }
1928
1929 #[test]
1930 fn test_split_builder_steps_collected() {
1931 use camel_api::splitter::{SplitterConfig, split_body_lines};
1932
1933 let definition = RouteBuilder::from("timer:test?period=1000")
1934 .route_id("test-route")
1935 .split(SplitterConfig::new(split_body_lines()))
1936 .set_header("fragment", Value::String("yes".into()))
1937 .to("mock:per-fragment")
1938 .end_split()
1939 .build()
1940 .unwrap();
1941
1942 assert_eq!(definition.steps().len(), 1);
1944 match &definition.steps()[0] {
1945 BuilderStep::Split { steps, .. } => {
1946 assert_eq!(steps.len(), 2); }
1948 other => panic!("Expected Split, got {:?}", other),
1949 }
1950 }
1951
1952 #[test]
1953 fn test_split_builder_config_propagated() {
1954 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1955
1956 let definition = RouteBuilder::from("timer:test?period=1000")
1957 .route_id("test-route")
1958 .split(
1959 SplitterConfig::new(split_body_lines())
1960 .parallel(true)
1961 .parallel_limit(4)
1962 .aggregation(AggregationStrategy::CollectAll),
1963 )
1964 .to("mock:per-fragment")
1965 .end_split()
1966 .build()
1967 .unwrap();
1968
1969 match &definition.steps()[0] {
1970 BuilderStep::Split { config, .. } => {
1971 assert!(config.parallel);
1972 assert_eq!(config.parallel_limit, Some(4));
1973 assert!(matches!(
1974 config.aggregation,
1975 AggregationStrategy::CollectAll
1976 ));
1977 }
1978 other => panic!("Expected Split, got {:?}", other),
1979 }
1980 }
1981
1982 #[test]
1983 fn test_aggregate_builder_adds_step() {
1984 use camel_api::aggregator::AggregatorConfig;
1985 use camel_core::route::BuilderStep;
1986
1987 let definition = RouteBuilder::from("timer:tick")
1988 .route_id("test-route")
1989 .aggregate(
1990 AggregatorConfig::correlate_by("key")
1991 .complete_when_size(2)
1992 .build(),
1993 )
1994 .build()
1995 .unwrap();
1996
1997 assert_eq!(definition.steps().len(), 1);
1998 assert!(matches!(
1999 definition.steps()[0],
2000 BuilderStep::Aggregate { .. }
2001 ));
2002 }
2003
2004 #[test]
2005 fn test_aggregate_in_split_builder() {
2006 use camel_api::aggregator::AggregatorConfig;
2007 use camel_api::splitter::{SplitterConfig, split_body_lines};
2008 use camel_core::route::BuilderStep;
2009
2010 let definition = RouteBuilder::from("timer:tick")
2011 .route_id("test-route")
2012 .split(SplitterConfig::new(split_body_lines()))
2013 .aggregate(
2014 AggregatorConfig::correlate_by("key")
2015 .complete_when_size(1)
2016 .build(),
2017 )
2018 .end_split()
2019 .build()
2020 .unwrap();
2021
2022 assert_eq!(definition.steps().len(), 1);
2023 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2024 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2025 } else {
2026 panic!("expected Split step");
2027 }
2028 }
2029
2030 #[test]
2033 fn test_builder_set_body_static_adds_processor() {
2034 let definition = RouteBuilder::from("timer:tick")
2035 .route_id("test-route")
2036 .set_body("fixed")
2037 .build()
2038 .unwrap();
2039 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2040 }
2041
2042 #[test]
2043 fn test_builder_set_body_fn_adds_processor() {
2044 let definition = RouteBuilder::from("timer:tick")
2045 .route_id("test-route")
2046 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2047 .build()
2048 .unwrap();
2049 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2050 }
2051
2052 #[test]
2053 fn transform_alias_produces_same_as_set_body() {
2054 let route_transform = RouteBuilder::from("timer:tick")
2055 .route_id("test-route")
2056 .transform("hello")
2057 .build()
2058 .unwrap();
2059
2060 let route_set_body = RouteBuilder::from("timer:tick")
2061 .route_id("test-route")
2062 .set_body("hello")
2063 .build()
2064 .unwrap();
2065
2066 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2067 }
2068
2069 #[test]
2070 fn test_builder_set_header_fn_adds_processor() {
2071 let definition = RouteBuilder::from("timer:tick")
2072 .route_id("test-route")
2073 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2074 .build()
2075 .unwrap();
2076 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2077 }
2078
2079 #[tokio::test]
2080 async fn test_set_body_static_processor_works() {
2081 use camel_core::route::compose_pipeline;
2082 let def = RouteBuilder::from("t:t")
2083 .route_id("test-route")
2084 .set_body("replaced")
2085 .build()
2086 .unwrap();
2087 let pipeline = compose_pipeline(
2088 def.steps()
2089 .iter()
2090 .filter_map(|s| {
2091 if let BuilderStep::Processor(p) = s {
2092 Some(p.clone())
2093 } else {
2094 None
2095 }
2096 })
2097 .collect(),
2098 );
2099 let exchange = Exchange::new(Message::new("original"));
2100 let result = pipeline.oneshot(exchange).await.unwrap();
2101 assert_eq!(result.input.body.as_text(), Some("replaced"));
2102 }
2103
2104 #[tokio::test]
2105 async fn test_set_body_fn_processor_works() {
2106 use camel_core::route::compose_pipeline;
2107 let def = RouteBuilder::from("t:t")
2108 .route_id("test-route")
2109 .set_body_fn(|ex: &Exchange| {
2110 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2111 })
2112 .build()
2113 .unwrap();
2114 let pipeline = compose_pipeline(
2115 def.steps()
2116 .iter()
2117 .filter_map(|s| {
2118 if let BuilderStep::Processor(p) = s {
2119 Some(p.clone())
2120 } else {
2121 None
2122 }
2123 })
2124 .collect(),
2125 );
2126 let exchange = Exchange::new(Message::new("hello"));
2127 let result = pipeline.oneshot(exchange).await.unwrap();
2128 assert_eq!(result.input.body.as_text(), Some("HELLO"));
2129 }
2130
2131 #[tokio::test]
2132 async fn test_set_header_fn_processor_works() {
2133 use camel_core::route::compose_pipeline;
2134 let def = RouteBuilder::from("t:t")
2135 .route_id("test-route")
2136 .set_header_fn("echo", |ex: &Exchange| {
2137 ex.input
2138 .body
2139 .as_text()
2140 .map(|t| Value::String(t.into()))
2141 .unwrap_or(Value::Null)
2142 })
2143 .build()
2144 .unwrap();
2145 let pipeline = compose_pipeline(
2146 def.steps()
2147 .iter()
2148 .filter_map(|s| {
2149 if let BuilderStep::Processor(p) = s {
2150 Some(p.clone())
2151 } else {
2152 None
2153 }
2154 })
2155 .collect(),
2156 );
2157 let exchange = Exchange::new(Message::new("ping"));
2158 let result = pipeline.oneshot(exchange).await.unwrap();
2159 assert_eq!(
2160 result.input.header("echo"),
2161 Some(&Value::String("ping".into()))
2162 );
2163 }
2164
2165 #[test]
2168 fn test_filter_builder_typestate() {
2169 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2170 .route_id("test-route")
2171 .filter(|_ex| true)
2172 .to("mock:inner")
2173 .end_filter()
2174 .to("mock:outer")
2175 .build();
2176 assert!(result.is_ok());
2177 }
2178
2179 #[test]
2180 fn test_filter_builder_steps_collected() {
2181 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2182 .route_id("test-route")
2183 .filter(|_ex| true)
2184 .to("mock:inner")
2185 .end_filter()
2186 .build()
2187 .unwrap();
2188
2189 assert_eq!(definition.steps().len(), 1);
2190 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2191 }
2192
2193 #[test]
2194 fn test_wire_tap_builder_adds_step() {
2195 let definition = RouteBuilder::from("timer:tick")
2196 .route_id("test-route")
2197 .wire_tap("mock:tap")
2198 .to("mock:result")
2199 .build()
2200 .unwrap();
2201
2202 assert_eq!(definition.steps().len(), 2);
2203 assert!(
2204 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2205 );
2206 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2207 }
2208
2209 #[test]
2212 fn test_multicast_builder_typestate() {
2213 let definition = RouteBuilder::from("timer:tick")
2214 .route_id("test-route")
2215 .multicast()
2216 .to("direct:a")
2217 .to("direct:b")
2218 .end_multicast()
2219 .to("mock:result")
2220 .build()
2221 .unwrap();
2222
2223 assert_eq!(definition.steps().len(), 2); }
2225
2226 #[test]
2227 fn test_multicast_builder_steps_collected() {
2228 let definition = RouteBuilder::from("timer:tick")
2229 .route_id("test-route")
2230 .multicast()
2231 .to("direct:a")
2232 .to("direct:b")
2233 .end_multicast()
2234 .build()
2235 .unwrap();
2236
2237 match &definition.steps()[0] {
2238 BuilderStep::Multicast { steps, .. } => {
2239 assert_eq!(steps.len(), 2);
2240 }
2241 other => panic!("Expected Multicast, got {:?}", other),
2242 }
2243 }
2244
2245 #[test]
2248 fn test_builder_concurrent_sets_concurrency() {
2249 use camel_component_api::ConcurrencyModel;
2250
2251 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2252 .route_id("test-route")
2253 .concurrent(16)
2254 .to("log:info")
2255 .build()
2256 .unwrap();
2257
2258 assert_eq!(
2259 definition.concurrency_override(),
2260 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2261 );
2262 }
2263
2264 #[test]
2265 fn test_builder_concurrent_zero_means_unbounded() {
2266 use camel_component_api::ConcurrencyModel;
2267
2268 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2269 .route_id("test-route")
2270 .concurrent(0)
2271 .to("log:info")
2272 .build()
2273 .unwrap();
2274
2275 assert_eq!(
2276 definition.concurrency_override(),
2277 Some(&ConcurrencyModel::Concurrent { max: None })
2278 );
2279 }
2280
2281 #[test]
2282 fn test_builder_sequential_sets_concurrency() {
2283 use camel_component_api::ConcurrencyModel;
2284
2285 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2286 .route_id("test-route")
2287 .sequential()
2288 .to("log:info")
2289 .build()
2290 .unwrap();
2291
2292 assert_eq!(
2293 definition.concurrency_override(),
2294 Some(&ConcurrencyModel::Sequential)
2295 );
2296 }
2297
2298 #[test]
2299 fn test_builder_default_concurrency_is_none() {
2300 let definition = RouteBuilder::from("timer:tick")
2301 .route_id("test-route")
2302 .to("log:info")
2303 .build()
2304 .unwrap();
2305
2306 assert_eq!(definition.concurrency_override(), None);
2307 }
2308
2309 #[test]
2312 fn test_builder_route_id_sets_id() {
2313 let definition = RouteBuilder::from("timer:tick")
2314 .route_id("my-route")
2315 .build()
2316 .unwrap();
2317
2318 assert_eq!(definition.route_id(), "my-route");
2319 }
2320
2321 #[test]
2322 fn test_build_without_route_id_fails() {
2323 let result = RouteBuilder::from("timer:tick?period=1000")
2324 .to("log:info")
2325 .build();
2326 let err = match result {
2327 Err(e) => e.to_string(),
2328 Ok(_) => panic!("build() should fail without route_id"),
2329 };
2330 assert!(
2331 err.contains("route_id"),
2332 "error should mention route_id, got: {}",
2333 err
2334 );
2335 }
2336
2337 #[test]
2338 fn test_builder_auto_startup_false() {
2339 let definition = RouteBuilder::from("timer:tick")
2340 .route_id("test-route")
2341 .auto_startup(false)
2342 .build()
2343 .unwrap();
2344
2345 assert!(!definition.auto_startup());
2346 }
2347
2348 #[test]
2349 fn test_builder_startup_order_custom() {
2350 let definition = RouteBuilder::from("timer:tick")
2351 .route_id("test-route")
2352 .startup_order(50)
2353 .build()
2354 .unwrap();
2355
2356 assert_eq!(definition.startup_order(), 50);
2357 }
2358
2359 #[test]
2360 fn test_builder_defaults() {
2361 let definition = RouteBuilder::from("timer:tick")
2362 .route_id("test-route")
2363 .build()
2364 .unwrap();
2365
2366 assert_eq!(definition.route_id(), "test-route");
2367 assert!(definition.auto_startup());
2368 assert_eq!(definition.startup_order(), 1000);
2369 }
2370
2371 #[test]
2374 fn test_choice_builder_single_when() {
2375 let definition = RouteBuilder::from("timer:tick")
2376 .route_id("test-route")
2377 .choice()
2378 .when(|ex: &Exchange| ex.input.header("type").is_some())
2379 .to("mock:typed")
2380 .end_when()
2381 .end_choice()
2382 .build()
2383 .unwrap();
2384 assert_eq!(definition.steps().len(), 1);
2385 assert!(
2386 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2387 if whens.len() == 1 && otherwise.is_none())
2388 );
2389 }
2390
2391 #[test]
2392 fn test_choice_builder_when_otherwise() {
2393 let definition = RouteBuilder::from("timer:tick")
2394 .route_id("test-route")
2395 .choice()
2396 .when(|ex: &Exchange| ex.input.header("a").is_some())
2397 .to("mock:a")
2398 .end_when()
2399 .otherwise()
2400 .to("mock:fallback")
2401 .end_otherwise()
2402 .end_choice()
2403 .build()
2404 .unwrap();
2405 assert!(
2406 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2407 if whens.len() == 1 && otherwise.is_some())
2408 );
2409 }
2410
2411 #[test]
2412 fn test_choice_builder_multiple_whens() {
2413 let definition = RouteBuilder::from("timer:tick")
2414 .route_id("test-route")
2415 .choice()
2416 .when(|ex: &Exchange| ex.input.header("a").is_some())
2417 .to("mock:a")
2418 .end_when()
2419 .when(|ex: &Exchange| ex.input.header("b").is_some())
2420 .to("mock:b")
2421 .end_when()
2422 .end_choice()
2423 .build()
2424 .unwrap();
2425 assert!(
2426 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2427 if whens.len() == 2)
2428 );
2429 }
2430
2431 #[test]
2432 fn test_choice_step_after_choice() {
2433 let definition = RouteBuilder::from("timer:tick")
2435 .route_id("test-route")
2436 .choice()
2437 .when(|_ex: &Exchange| true)
2438 .to("mock:inner")
2439 .end_when()
2440 .end_choice()
2441 .to("mock:outer") .build()
2443 .unwrap();
2444 assert_eq!(definition.steps().len(), 2);
2445 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2446 }
2447
2448 #[test]
2451 fn test_throttle_builder_typestate() {
2452 let definition = RouteBuilder::from("timer:tick")
2453 .route_id("test-route")
2454 .throttle(10, std::time::Duration::from_secs(1))
2455 .to("mock:result")
2456 .end_throttle()
2457 .build()
2458 .unwrap();
2459
2460 assert_eq!(definition.steps().len(), 1);
2461 assert!(matches!(
2462 &definition.steps()[0],
2463 BuilderStep::Throttle { .. }
2464 ));
2465 }
2466
2467 #[test]
2468 fn test_throttle_builder_with_strategy() {
2469 let definition = RouteBuilder::from("timer:tick")
2470 .route_id("test-route")
2471 .throttle(10, std::time::Duration::from_secs(1))
2472 .strategy(ThrottleStrategy::Reject)
2473 .to("mock:result")
2474 .end_throttle()
2475 .build()
2476 .unwrap();
2477
2478 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2479 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2480 } else {
2481 panic!("Expected Throttle step");
2482 }
2483 }
2484
2485 #[test]
2486 fn test_throttle_builder_steps_collected() {
2487 let definition = RouteBuilder::from("timer:tick")
2488 .route_id("test-route")
2489 .throttle(5, std::time::Duration::from_secs(1))
2490 .set_header("throttled", Value::Bool(true))
2491 .to("mock:throttled")
2492 .end_throttle()
2493 .build()
2494 .unwrap();
2495
2496 match &definition.steps()[0] {
2497 BuilderStep::Throttle { steps, .. } => {
2498 assert_eq!(steps.len(), 2); }
2500 other => panic!("Expected Throttle, got {:?}", other),
2501 }
2502 }
2503
2504 #[test]
2505 fn test_throttle_step_after_throttle() {
2506 let definition = RouteBuilder::from("timer:tick")
2508 .route_id("test-route")
2509 .throttle(10, std::time::Duration::from_secs(1))
2510 .to("mock:inner")
2511 .end_throttle()
2512 .to("mock:outer")
2513 .build()
2514 .unwrap();
2515
2516 assert_eq!(definition.steps().len(), 2);
2517 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2518 }
2519
2520 #[test]
2523 fn test_load_balance_builder_typestate() {
2524 let definition = RouteBuilder::from("timer:tick")
2525 .route_id("test-route")
2526 .load_balance()
2527 .round_robin()
2528 .to("mock:a")
2529 .to("mock:b")
2530 .end_load_balance()
2531 .build()
2532 .unwrap();
2533
2534 assert_eq!(definition.steps().len(), 1);
2535 assert!(matches!(
2536 &definition.steps()[0],
2537 BuilderStep::LoadBalance { .. }
2538 ));
2539 }
2540
2541 #[test]
2542 fn test_load_balance_builder_with_strategy() {
2543 let definition = RouteBuilder::from("timer:tick")
2544 .route_id("test-route")
2545 .load_balance()
2546 .random()
2547 .to("mock:result")
2548 .end_load_balance()
2549 .build()
2550 .unwrap();
2551
2552 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2553 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2554 } else {
2555 panic!("Expected LoadBalance step");
2556 }
2557 }
2558
2559 #[test]
2560 fn test_load_balance_builder_steps_collected() {
2561 let definition = RouteBuilder::from("timer:tick")
2562 .route_id("test-route")
2563 .load_balance()
2564 .set_header("lb", Value::Bool(true))
2565 .to("mock:a")
2566 .end_load_balance()
2567 .build()
2568 .unwrap();
2569
2570 match &definition.steps()[0] {
2571 BuilderStep::LoadBalance { steps, .. } => {
2572 assert_eq!(steps.len(), 2); }
2574 other => panic!("Expected LoadBalance, got {:?}", other),
2575 }
2576 }
2577
2578 #[test]
2579 fn test_load_balance_step_after_load_balance() {
2580 let definition = RouteBuilder::from("timer:tick")
2582 .route_id("test-route")
2583 .load_balance()
2584 .to("mock:inner")
2585 .end_load_balance()
2586 .to("mock:outer")
2587 .build()
2588 .unwrap();
2589
2590 assert_eq!(definition.steps().len(), 2);
2591 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2592 }
2593
2594 #[test]
2597 fn test_dynamic_router_builder() {
2598 let definition = RouteBuilder::from("timer:tick")
2599 .route_id("test-route")
2600 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2601 .build()
2602 .unwrap();
2603
2604 assert_eq!(definition.steps().len(), 1);
2605 assert!(matches!(
2606 &definition.steps()[0],
2607 BuilderStep::DynamicRouter { .. }
2608 ));
2609 }
2610
2611 #[test]
2612 fn test_dynamic_router_builder_with_config() {
2613 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2614 .max_iterations(100)
2615 .cache_size(500);
2616
2617 let definition = RouteBuilder::from("timer:tick")
2618 .route_id("test-route")
2619 .dynamic_router_with_config(config)
2620 .build()
2621 .unwrap();
2622
2623 assert_eq!(definition.steps().len(), 1);
2624 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2625 assert_eq!(config.max_iterations, 100);
2626 assert_eq!(config.cache_size, 500);
2627 } else {
2628 panic!("Expected DynamicRouter step");
2629 }
2630 }
2631
2632 #[test]
2633 fn test_dynamic_router_step_after_router() {
2634 let definition = RouteBuilder::from("timer:tick")
2636 .route_id("test-route")
2637 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2638 .to("mock:outer")
2639 .build()
2640 .unwrap();
2641
2642 assert_eq!(definition.steps().len(), 2);
2643 assert!(matches!(
2644 &definition.steps()[0],
2645 BuilderStep::DynamicRouter { .. }
2646 ));
2647 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2648 }
2649
2650 #[test]
2651 fn routing_slip_builder_creates_step() {
2652 use camel_api::RoutingSlipExpression;
2653
2654 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2655
2656 let route = RouteBuilder::from("direct:start")
2657 .route_id("routing-slip-test")
2658 .routing_slip(expression)
2659 .build()
2660 .unwrap();
2661
2662 assert!(
2663 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2664 "Expected RoutingSlip step"
2665 );
2666 }
2667
2668 #[test]
2669 fn routing_slip_with_config_builder_creates_step() {
2670 use camel_api::RoutingSlipConfig;
2671
2672 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2673 .uri_delimiter("|")
2674 .cache_size(50)
2675 .ignore_invalid_endpoints(true);
2676
2677 let route = RouteBuilder::from("direct:start")
2678 .route_id("routing-slip-config-test")
2679 .routing_slip_with_config(config)
2680 .build()
2681 .unwrap();
2682
2683 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2684 assert_eq!(config.uri_delimiter, "|");
2685 assert_eq!(config.cache_size, 50);
2686 assert!(config.ignore_invalid_endpoints);
2687 } else {
2688 panic!("Expected RoutingSlip step");
2689 }
2690 }
2691
2692 #[test]
2693 fn test_builder_marshal_adds_processor_step() {
2694 let definition = RouteBuilder::from("timer:tick")
2695 .route_id("test-route")
2696 .marshal("json")
2697 .build()
2698 .unwrap();
2699 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2700 }
2701
2702 #[test]
2703 fn test_builder_unmarshal_adds_processor_step() {
2704 let definition = RouteBuilder::from("timer:tick")
2705 .route_id("test-route")
2706 .unmarshal("json")
2707 .build()
2708 .unwrap();
2709 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2710 }
2711
2712 #[test]
2713 fn test_builder_stream_cache_adds_processor_step() {
2714 let definition = RouteBuilder::from("timer:tick")
2715 .route_id("test-route")
2716 .stream_cache(1024)
2717 .build()
2718 .unwrap();
2719 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2720 }
2721
2722 #[test]
2723 fn validate_adds_to_step_with_validator_uri() {
2724 let def = RouteBuilder::from("direct:in")
2725 .route_id("test")
2726 .validate("schemas/order.xsd")
2727 .build()
2728 .unwrap();
2729 let steps = def.steps();
2730 assert_eq!(steps.len(), 1);
2731 assert!(
2732 matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2733 "got: {:?}",
2734 steps[0]
2735 );
2736 }
2737
2738 #[test]
2739 #[should_panic(expected = "unknown data format: 'csv'")]
2740 fn test_builder_marshal_panics_on_unknown_format() {
2741 let _ = RouteBuilder::from("timer:tick")
2742 .route_id("test-route")
2743 .marshal("csv")
2744 .build();
2745 }
2746
2747 #[test]
2748 #[should_panic(expected = "unknown data format: 'csv'")]
2749 fn test_builder_unmarshal_panics_on_unknown_format() {
2750 let _ = RouteBuilder::from("timer:tick")
2751 .route_id("test-route")
2752 .unmarshal("csv")
2753 .build();
2754 }
2755
2756 #[test]
2757 fn test_builder_recipient_list_creates_step() {
2758 let route = RouteBuilder::from("direct:start")
2759 .route_id("recipient-list-test")
2760 .recipient_list(Arc::new(|_| "direct:a,direct:b".to_string()))
2761 .build()
2762 .unwrap();
2763
2764 assert!(matches!(
2765 &route.steps()[0],
2766 BuilderStep::RecipientList { .. }
2767 ));
2768 }
2769
2770 #[test]
2771 fn test_builder_recipient_list_with_config_creates_step() {
2772 let config = RecipientListConfig::new(Arc::new(|_| "mock:a".to_string()));
2773
2774 let route = RouteBuilder::from("direct:start")
2775 .route_id("recipient-list-config-test")
2776 .recipient_list_with_config(config)
2777 .build()
2778 .unwrap();
2779
2780 assert!(matches!(
2781 &route.steps()[0],
2782 BuilderStep::RecipientList { .. }
2783 ));
2784 }
2785
2786 #[test]
2787 fn test_builder_script_adds_script_step() {
2788 let route = RouteBuilder::from("direct:start")
2789 .route_id("script-test")
2790 .script("rhai", "headers[\"x\"] = \"y\"")
2791 .build()
2792 .unwrap();
2793
2794 assert!(matches!(
2795 &route.steps()[0],
2796 BuilderStep::Script { language, script }
2797 if language == "rhai" && script == "headers[\"x\"] = \"y\""
2798 ));
2799 }
2800
2801 #[test]
2802 fn test_builder_delay_and_delay_with_header_add_steps() {
2803 let route = RouteBuilder::from("direct:start")
2804 .route_id("delay-test")
2805 .delay(Duration::from_millis(250))
2806 .delay_with_header(Duration::from_millis(500), "x-delay")
2807 .build()
2808 .unwrap();
2809
2810 assert_eq!(route.steps().len(), 2);
2811 assert!(matches!(&route.steps()[0], BuilderStep::Delay { .. }));
2812 assert!(matches!(&route.steps()[1], BuilderStep::Delay { .. }));
2813 }
2814
2815 #[test]
2816 fn test_builder_log_and_stop_add_steps_in_order() {
2817 let route = RouteBuilder::from("direct:start")
2818 .route_id("log-stop-test")
2819 .log("hello", LogLevel::Info)
2820 .stop()
2821 .to("mock:after")
2822 .build()
2823 .unwrap();
2824
2825 assert_eq!(route.steps().len(), 3);
2826 assert!(matches!(
2827 &route.steps()[0],
2828 BuilderStep::Log { message, .. } if message == "hello"
2829 ));
2830 assert!(matches!(&route.steps()[1], BuilderStep::Stop));
2831 assert!(matches!(&route.steps()[2], BuilderStep::To(uri) if uri == "mock:after"));
2832 }
2833
2834 #[test]
2835 fn test_builder_stream_cache_default_adds_processor_step() {
2836 let route = RouteBuilder::from("direct:start")
2837 .route_id("stream-cache-default-test")
2838 .stream_cache_default()
2839 .build()
2840 .unwrap();
2841
2842 assert!(matches!(&route.steps()[0], BuilderStep::Processor(_)));
2843 }
2844
2845 #[test]
2846 fn test_validate_preserves_existing_validator_prefix() {
2847 let route = RouteBuilder::from("direct:in")
2848 .route_id("validate-prefix-test")
2849 .validate("validator:schemas/order.xsd")
2850 .build()
2851 .unwrap();
2852
2853 assert!(matches!(
2854 &route.steps()[0],
2855 BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"
2856 ));
2857 }
2858
2859 #[test]
2860 fn test_load_balance_builder_weighted_failover_parallel_config() {
2861 let route = RouteBuilder::from("direct:start")
2862 .route_id("lb-weighted-failover-parallel")
2863 .load_balance()
2864 .weighted(vec![
2865 ("direct:a".to_string(), 3),
2866 ("direct:b".to_string(), 1),
2867 ])
2868 .failover()
2869 .parallel(true)
2870 .to("mock:result")
2871 .end_load_balance()
2872 .build()
2873 .unwrap();
2874
2875 if let BuilderStep::LoadBalance { config, .. } = &route.steps()[0] {
2876 assert_eq!(config.strategy, LoadBalanceStrategy::Failover);
2877 assert!(config.parallel);
2878 } else {
2879 panic!("Expected LoadBalance step");
2880 }
2881 }
2882
2883 #[test]
2884 fn test_multicast_builder_all_config_setters() {
2885 let route = RouteBuilder::from("direct:start")
2886 .route_id("multicast-config-test")
2887 .multicast()
2888 .parallel(true)
2889 .parallel_limit(4)
2890 .stop_on_exception(true)
2891 .timeout(Duration::from_millis(300))
2892 .aggregation(MulticastStrategy::Original)
2893 .to("mock:a")
2894 .end_multicast()
2895 .build()
2896 .unwrap();
2897
2898 if let BuilderStep::Multicast { config, .. } = &route.steps()[0] {
2899 assert!(config.parallel);
2900 assert_eq!(config.parallel_limit, Some(4));
2901 assert!(config.stop_on_exception);
2902 assert_eq!(config.timeout, Some(Duration::from_millis(300)));
2903 assert!(matches!(config.aggregation, MulticastStrategy::Original));
2904 } else {
2905 panic!("Expected Multicast step");
2906 }
2907 }
2908
2909 #[test]
2910 fn test_build_canonical_rejects_unsupported_processor_step() {
2911 let err = RouteBuilder::from("direct:start")
2912 .route_id("canonical-reject")
2913 .set_header("k", Value::String("v".into()))
2914 .build_canonical()
2915 .unwrap_err();
2916
2917 assert!(format!("{err}").contains("does not support step `processor`"));
2918 }
2919}