1use camel_api::DelayConfig;
2use camel_api::aggregator::{
3 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
4};
5use camel_api::body::Body;
6use camel_api::body_converter::BodyType;
7use camel_api::circuit_breaker::CircuitBreakerConfig;
8use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
9use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
10use camel_api::load_balancer::LoadBalancerConfig;
11use camel_api::loop_eip::{LoopConfig, LoopMode};
12use camel_api::multicast::{MulticastConfig, MulticastStrategy};
13use camel_api::recipient_list::{RecipientListConfig, RecipientListExpression};
14use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
15use camel_api::splitter::SplitterConfig;
16use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
17use camel_api::{
18 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
19 ProcessorFn, Value,
20 runtime::{
21 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
22 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
23 CanonicalWhenSpec,
24 },
25};
26use camel_component_api::ConcurrencyModel;
27use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
28use camel_processor::{
29 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
30 StreamCacheService, UnmarshalService, builtin_data_format,
31};
32
33pub trait StepAccumulator: Sized {
39 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
40
41 fn to(mut self, endpoint: impl Into<String>) -> Self {
42 self.steps_mut().push(BuilderStep::To(endpoint.into()));
43 self
44 }
45
46 fn process<F, Fut>(mut self, f: F) -> Self
47 where
48 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
49 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
50 {
51 let svc = ProcessorFn::new(f);
52 self.steps_mut()
53 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
54 self
55 }
56
57 fn process_fn(mut self, processor: BoxProcessor) -> Self {
58 self.steps_mut().push(BuilderStep::Processor(processor));
59 self
60 }
61
62 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
63 let svc = SetHeader::new(IdentityProcessor, key, value);
64 self.steps_mut()
65 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
66 self
67 }
68
69 fn map_body<F>(mut self, mapper: F) -> Self
70 where
71 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
72 {
73 let svc = MapBody::new(IdentityProcessor, mapper);
74 self.steps_mut()
75 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
76 self
77 }
78
79 fn set_body<B>(mut self, body: B) -> Self
80 where
81 B: Into<Body> + Clone + Send + Sync + 'static,
82 {
83 let body: Body = body.into();
84 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
85 self.steps_mut()
86 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
87 self
88 }
89
90 fn transform<B>(self, body: B) -> Self
95 where
96 B: Into<Body> + Clone + Send + Sync + 'static,
97 {
98 self.set_body(body)
99 }
100
101 fn set_body_fn<F>(mut self, expr: F) -> Self
102 where
103 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
104 {
105 let svc = SetBody::new(IdentityProcessor, expr);
106 self.steps_mut()
107 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
108 self
109 }
110
111 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
112 where
113 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
114 {
115 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
116 self.steps_mut()
117 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
118 self
119 }
120
121 fn aggregate(mut self, config: AggregatorConfig) -> Self {
122 self.steps_mut().push(BuilderStep::Aggregate { config });
123 self
124 }
125
126 fn stop(mut self) -> Self {
132 self.steps_mut().push(BuilderStep::Stop);
133 self
134 }
135
136 fn delay(mut self, duration: std::time::Duration) -> Self {
137 self.steps_mut().push(BuilderStep::Delay {
138 config: DelayConfig::from_duration(duration),
139 });
140 self
141 }
142
143 fn delay_with_header(
144 mut self,
145 duration: std::time::Duration,
146 header: impl Into<String>,
147 ) -> Self {
148 self.steps_mut().push(BuilderStep::Delay {
149 config: DelayConfig::from_duration_with_header(duration, header),
150 });
151 self
152 }
153
154 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
158 self.steps_mut().push(BuilderStep::Log {
159 level,
160 message: message.into(),
161 });
162 self
163 }
164
165 fn convert_body_to(mut self, target: BodyType) -> Self {
177 let svc = ConvertBodyTo::new(IdentityProcessor, target);
178 self.steps_mut()
179 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
180 self
181 }
182
183 fn stream_cache(mut self, threshold: usize) -> Self {
184 let config = camel_api::stream_cache::StreamCacheConfig::new(threshold);
185 let svc = StreamCacheService::new(IdentityProcessor, config);
186 self.steps_mut()
187 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
188 self
189 }
190
191 fn stream_cache_default(self) -> Self {
195 self.stream_cache(camel_api::stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD)
196 }
197
198 fn marshal(mut self, format: impl Into<String>) -> Self {
208 let name = format.into();
209 let df =
210 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
211 let svc = MarshalService::new(IdentityProcessor, df);
212 self.steps_mut()
213 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
214 self
215 }
216
217 fn unmarshal(mut self, format: impl Into<String>) -> Self {
227 let name = format.into();
228 let df =
229 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
230 let svc = UnmarshalService::new(IdentityProcessor, df);
231 self.steps_mut()
232 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
233 self
234 }
235
236 fn validate(mut self, schema_path: impl Into<String>) -> Self {
245 let path = schema_path.into();
246 let uri = if path.starts_with("validator:") {
247 path
248 } else {
249 format!("validator:{path}")
250 };
251 self.steps_mut().push(BuilderStep::To(uri));
252 self
253 }
254
255 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
266 self.steps_mut().push(BuilderStep::Script {
267 language: language.into(),
268 script: script.into(),
269 });
270 self
271 }
272}
273
274pub struct RouteBuilder {
286 from_uri: String,
287 steps: Vec<BuilderStep>,
288 error_handler: Option<ErrorHandlerConfig>,
289 error_handler_mode: ErrorHandlerMode,
290 circuit_breaker_config: Option<CircuitBreakerConfig>,
291 concurrency: Option<ConcurrencyModel>,
292 route_id: Option<String>,
293 auto_startup: Option<bool>,
294 startup_order: Option<i32>,
295}
296
297#[derive(Default)]
298enum ErrorHandlerMode {
299 #[default]
300 None,
301 ExplicitConfig,
302 Shorthand {
303 dlc_uri: Option<String>,
304 specs: Vec<OnExceptionSpec>,
305 },
306 Mixed,
307}
308
309#[derive(Clone)]
310struct OnExceptionSpec {
311 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
312 retry: Option<RedeliveryPolicy>,
313 handled_by: Option<String>,
314}
315
316impl RouteBuilder {
317 pub fn from(endpoint: &str) -> Self {
319 Self {
320 from_uri: endpoint.to_string(),
321 steps: Vec::new(),
322 error_handler: None,
323 error_handler_mode: ErrorHandlerMode::None,
324 circuit_breaker_config: None,
325 concurrency: None,
326 route_id: None,
327 auto_startup: None,
328 startup_order: None,
329 }
330 }
331
332 pub fn filter<F>(self, predicate: F) -> FilterBuilder
336 where
337 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
338 {
339 FilterBuilder {
340 parent: self,
341 predicate: std::sync::Arc::new(predicate),
342 steps: vec![],
343 }
344 }
345
346 pub fn choice(self) -> ChoiceBuilder {
352 ChoiceBuilder {
353 parent: self,
354 whens: vec![],
355 _otherwise: None,
356 }
357 }
358
359 pub fn wire_tap(mut self, endpoint: &str) -> Self {
363 self.steps.push(BuilderStep::WireTap {
364 uri: endpoint.to_string(),
365 });
366 self
367 }
368
369 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
371 self.error_handler_mode = match self.error_handler_mode {
372 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
373 ErrorHandlerMode::ExplicitConfig
374 }
375 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
376 };
377 self.error_handler = Some(config);
378 self
379 }
380
381 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
383 let uri = uri.into();
384 self.error_handler_mode = match self.error_handler_mode {
385 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
386 dlc_uri: Some(uri),
387 specs: Vec::new(),
388 },
389 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
390 dlc_uri: Some(uri),
391 specs,
392 },
393 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
394 };
395 self
396 }
397
398 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
400 where
401 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
402 {
403 self.error_handler_mode = match self.error_handler_mode {
404 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
405 dlc_uri: None,
406 specs: Vec::new(),
407 },
408 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
409 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
410 };
411
412 OnExceptionBuilder {
413 parent: self,
414 policy: OnExceptionSpec {
415 matches: std::sync::Arc::new(matches),
416 retry: None,
417 handled_by: None,
418 },
419 }
420 }
421
422 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
424 self.circuit_breaker_config = Some(config);
425 self
426 }
427
428 pub fn concurrent(mut self, max: usize) -> Self {
442 let max = if max == 0 { None } else { Some(max) };
443 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
444 self
445 }
446
447 pub fn sequential(mut self) -> Self {
452 self.concurrency = Some(ConcurrencyModel::Sequential);
453 self
454 }
455
456 pub fn route_id(mut self, id: impl Into<String>) -> Self {
460 self.route_id = Some(id.into());
461 self
462 }
463
464 pub fn auto_startup(mut self, auto: bool) -> Self {
468 self.auto_startup = Some(auto);
469 self
470 }
471
472 pub fn startup_order(mut self, order: i32) -> Self {
476 self.startup_order = Some(order);
477 self
478 }
479
480 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
486 SplitBuilder {
487 parent: self,
488 config,
489 steps: Vec::new(),
490 }
491 }
492
493 pub fn multicast(self) -> MulticastBuilder {
499 MulticastBuilder {
500 parent: self,
501 steps: Vec::new(),
502 config: MulticastConfig::new(),
503 }
504 }
505
506 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
513 ThrottleBuilder {
514 parent: self,
515 config: ThrottlerConfig::new(max_requests, period),
516 steps: Vec::new(),
517 }
518 }
519
520 pub fn loop_count(self, count: usize) -> LoopBuilder {
522 LoopBuilder {
523 parent: self,
524 config: LoopConfig {
525 mode: LoopMode::Count(count),
526 },
527 steps: vec![],
528 }
529 }
530
531 pub fn loop_while<F>(self, predicate: F) -> LoopBuilder
533 where
534 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
535 {
536 LoopBuilder {
537 parent: self,
538 config: LoopConfig {
539 mode: LoopMode::While(std::sync::Arc::new(predicate)),
540 },
541 steps: vec![],
542 }
543 }
544
545 pub fn load_balance(self) -> LoadBalancerBuilder {
551 LoadBalancerBuilder {
552 parent: self,
553 config: LoadBalancerConfig::round_robin(),
554 steps: Vec::new(),
555 }
556 }
557
558 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
574 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
575 }
576
577 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
581 self.steps.push(BuilderStep::DynamicRouter { config });
582 self
583 }
584
585 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
586 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
587 }
588
589 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
590 self.steps.push(BuilderStep::RoutingSlip { config });
591 self
592 }
593
594 pub fn recipient_list(self, expression: RecipientListExpression) -> Self {
595 self.recipient_list_with_config(RecipientListConfig::new(expression))
596 }
597
598 pub fn recipient_list_with_config(mut self, config: RecipientListConfig) -> Self {
599 self.steps.push(BuilderStep::RecipientList { config });
600 self
601 }
602
603 pub fn build(self) -> Result<RouteDefinition, CamelError> {
605 if self.from_uri.is_empty() {
606 return Err(CamelError::RouteError(
607 "route must have a 'from' URI".to_string(),
608 ));
609 }
610 let route_id = self.route_id.ok_or_else(|| {
611 CamelError::RouteError(
612 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
613 .to_string(),
614 )
615 })?;
616 let resolved_error_handler = match self.error_handler_mode {
617 ErrorHandlerMode::None => self.error_handler,
618 ErrorHandlerMode::ExplicitConfig => self.error_handler,
619 ErrorHandlerMode::Mixed => {
620 return Err(CamelError::RouteError(
621 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
622 ));
623 }
624 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
625 let mut config = if let Some(uri) = dlc_uri {
626 ErrorHandlerConfig::dead_letter_channel(uri)
627 } else {
628 ErrorHandlerConfig::log_only()
629 };
630
631 for spec in specs {
632 let matcher = spec.matches.clone();
633 let mut builder = config.on_exception(move |e| matcher(e));
634
635 if let Some(retry) = spec.retry {
636 builder = builder.retry(retry.max_attempts).with_backoff(
637 retry.initial_delay,
638 retry.multiplier,
639 retry.max_delay,
640 );
641 if retry.jitter_factor > 0.0 {
642 builder = builder.with_jitter(retry.jitter_factor);
643 }
644 }
645
646 if let Some(uri) = spec.handled_by {
647 builder = builder.handled_by(uri);
648 }
649
650 config = builder.build();
651 }
652
653 Some(config)
654 }
655 };
656
657 let definition = RouteDefinition::new(self.from_uri, self.steps);
658 let definition = if let Some(eh) = resolved_error_handler {
659 definition.with_error_handler(eh)
660 } else {
661 definition
662 };
663 let definition = if let Some(cb) = self.circuit_breaker_config {
664 definition.with_circuit_breaker(cb)
665 } else {
666 definition
667 };
668 let definition = if let Some(concurrency) = self.concurrency {
669 definition.with_concurrency(concurrency)
670 } else {
671 definition
672 };
673 let definition = definition.with_route_id(route_id);
674 let definition = if let Some(auto) = self.auto_startup {
675 definition.with_auto_startup(auto)
676 } else {
677 definition
678 };
679 let definition = if let Some(order) = self.startup_order {
680 definition.with_startup_order(order)
681 } else {
682 definition
683 };
684 Ok(definition)
685 }
686
687 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
689 if self.from_uri.is_empty() {
690 return Err(CamelError::RouteError(
691 "route must have a 'from' URI".to_string(),
692 ));
693 }
694 let route_id = self.route_id.ok_or_else(|| {
695 CamelError::RouteError(
696 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
697 .to_string(),
698 )
699 })?;
700
701 let steps = canonicalize_steps(self.steps)?;
702 let circuit_breaker = self
703 .circuit_breaker_config
704 .map(canonicalize_circuit_breaker);
705
706 let spec = CanonicalRouteSpec {
707 route_id,
708 from: self.from_uri,
709 steps,
710 circuit_breaker,
711 version: camel_api::CANONICAL_CONTRACT_VERSION,
712 };
713 spec.validate_contract()?;
714 Ok(spec)
715 }
716}
717
718pub struct OnExceptionBuilder {
719 parent: RouteBuilder,
720 policy: OnExceptionSpec,
721}
722
723impl OnExceptionBuilder {
724 pub fn retry(mut self, max_attempts: u32) -> Self {
725 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
726 self
727 }
728
729 pub fn with_backoff(
730 mut self,
731 initial: std::time::Duration,
732 multiplier: f64,
733 max: std::time::Duration,
734 ) -> Self {
735 if let Some(ref mut retry) = self.policy.retry {
736 retry.initial_delay = initial;
737 retry.multiplier = multiplier;
738 retry.max_delay = max;
739 }
740 self
741 }
742
743 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
744 if let Some(ref mut retry) = self.policy.retry {
745 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
746 }
747 self
748 }
749
750 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
751 self.policy.handled_by = Some(uri.into());
752 self
753 }
754
755 pub fn end_on_exception(mut self) -> RouteBuilder {
756 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
757 specs.push(self.policy);
758 }
759 self.parent
760 }
761}
762
763fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
764 let mut canonical = Vec::with_capacity(steps.len());
765 for step in steps {
766 canonical.push(canonicalize_step(step)?);
767 }
768 Ok(canonical)
769}
770
771fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
772 match step {
773 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
774 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
775 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
776 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
777 BuilderStep::Delay { config } => Ok(CanonicalStepSpec::Delay {
778 delay_ms: config.delay_ms,
779 dynamic_header: config.dynamic_header,
780 }),
781 BuilderStep::DeclarativeScript { expression } => {
782 Ok(CanonicalStepSpec::Script { expression })
783 }
784 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
785 predicate,
786 steps: canonicalize_steps(steps)?,
787 }),
788 BuilderStep::DeclarativeChoice { whens, otherwise } => {
789 let mut canonical_whens = Vec::with_capacity(whens.len());
790 for DeclarativeWhenStep { predicate, steps } in whens {
791 canonical_whens.push(CanonicalWhenSpec {
792 predicate,
793 steps: canonicalize_steps(steps)?,
794 });
795 }
796 let otherwise = match otherwise {
797 Some(steps) => Some(canonicalize_steps(steps)?),
798 None => None,
799 };
800 Ok(CanonicalStepSpec::Choice {
801 whens: canonical_whens,
802 otherwise,
803 })
804 }
805 BuilderStep::DeclarativeSplit {
806 expression,
807 aggregation,
808 parallel,
809 parallel_limit,
810 stop_on_exception,
811 steps,
812 } => Ok(CanonicalStepSpec::Split {
813 expression: CanonicalSplitExpressionSpec::Language(expression),
814 aggregation: canonicalize_split_aggregation(aggregation)?,
815 parallel,
816 parallel_limit,
817 stop_on_exception,
818 steps: canonicalize_steps(steps)?,
819 }),
820 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
821 config: canonicalize_aggregate(config)?,
822 }),
823 other => {
824 let step_name = canonical_step_name(&other);
825 let detail = camel_api::canonical_contract_rejection_reason(step_name)
826 .unwrap_or("not included in canonical v1");
827 Err(CamelError::RouteError(format!(
828 "canonical v1 does not support step `{step_name}`: {detail}"
829 )))
830 }
831 }
832}
833
834fn canonicalize_split_aggregation(
835 strategy: camel_api::splitter::AggregationStrategy,
836) -> Result<CanonicalSplitAggregationSpec, CamelError> {
837 match strategy {
838 camel_api::splitter::AggregationStrategy::LastWins => {
839 Ok(CanonicalSplitAggregationSpec::LastWins)
840 }
841 camel_api::splitter::AggregationStrategy::CollectAll => {
842 Ok(CanonicalSplitAggregationSpec::CollectAll)
843 }
844 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
845 "canonical v1 does not support custom split aggregation".to_string(),
846 )),
847 camel_api::splitter::AggregationStrategy::Original => {
848 Ok(CanonicalSplitAggregationSpec::Original)
849 }
850 }
851}
852
853fn extract_completion_fields(
854 mode: &CompletionMode,
855) -> Result<(Option<usize>, Option<u64>), CamelError> {
856 match mode {
857 CompletionMode::Single(cond) => match cond {
858 CompletionCondition::Size(n) => Ok((Some(*n), None)),
859 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
860 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
861 "canonical v1 does not support aggregate predicate completion".to_string(),
862 )),
863 },
864 CompletionMode::Any(conds) => {
865 let mut size = None;
866 let mut timeout_ms = None;
867 for cond in conds {
868 match cond {
869 CompletionCondition::Size(n) => size = Some(*n),
870 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
871 CompletionCondition::Predicate(_) => {
872 return Err(CamelError::RouteError(
873 "canonical v1 does not support aggregate predicate completion"
874 .to_string(),
875 ));
876 }
877 }
878 }
879 Ok((size, timeout_ms))
880 }
881 }
882}
883
884fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
885 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
886
887 let header = match &config.correlation {
888 CorrelationStrategy::HeaderName(h) => h.clone(),
889 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
890 CorrelationStrategy::Fn(_) => {
891 return Err(CamelError::RouteError(
892 "canonical v1 does not support Fn correlation strategy".to_string(),
893 ));
894 }
895 };
896
897 let correlation_key = match &config.correlation {
898 CorrelationStrategy::HeaderName(_) => None,
899 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
900 CorrelationStrategy::Fn(_) => unreachable!(),
901 };
902
903 let strategy = match config.strategy {
904 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
905 AggregationStrategy::Custom(_) => {
906 return Err(CamelError::RouteError(
907 "canonical v1 does not support custom aggregate strategy".to_string(),
908 ));
909 }
910 };
911 let bucket_ttl_ms = config
912 .bucket_ttl
913 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
914
915 Ok(CanonicalAggregateSpec {
916 header,
917 completion_size,
918 completion_timeout_ms,
919 correlation_key,
920 force_completion_on_stop: if config.force_completion_on_stop {
921 Some(true)
922 } else {
923 None
924 },
925 discard_on_timeout: if config.discard_on_timeout {
926 Some(true)
927 } else {
928 None
929 },
930 strategy,
931 max_buckets: config.max_buckets,
932 bucket_ttl_ms,
933 })
934}
935
936fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
937 CanonicalCircuitBreakerSpec {
938 failure_threshold: config.failure_threshold,
939 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
940 }
941}
942
943fn canonical_step_name(step: &BuilderStep) -> &'static str {
944 match step {
945 BuilderStep::Processor(_) => "processor",
946 BuilderStep::To(_) => "to",
947 BuilderStep::Stop => "stop",
948 BuilderStep::Log { .. } => "log",
949 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
950 BuilderStep::DeclarativeSetBody { .. } => "set_body",
951 BuilderStep::DeclarativeFilter { .. } => "filter",
952 BuilderStep::DeclarativeChoice { .. } => "choice",
953 BuilderStep::DeclarativeScript { .. } => "script",
954 BuilderStep::DeclarativeSplit { .. } => "split",
955 BuilderStep::Split { .. } => "split",
956 BuilderStep::Loop { .. } | BuilderStep::DeclarativeLoop { .. } => "loop",
957 BuilderStep::Aggregate { .. } => "aggregate",
958 BuilderStep::Filter { .. } => "filter",
959 BuilderStep::Choice { .. } => "choice",
960 BuilderStep::WireTap { .. } => "wire_tap",
961 BuilderStep::Delay { .. } => "delay",
962 BuilderStep::Multicast { .. } => "multicast",
963 BuilderStep::DeclarativeLog { .. } => "log",
964 BuilderStep::Bean { .. } => "bean",
965 BuilderStep::Script { .. } => "script",
966 BuilderStep::Throttle { .. } => "throttle",
967 BuilderStep::LoadBalance { .. } => "load_balancer",
968 BuilderStep::DynamicRouter { .. } => "dynamic_router",
969 BuilderStep::RoutingSlip { .. } => "routing_slip",
970 BuilderStep::DeclarativeDynamicRouter { .. } => "declarative_dynamic_router",
971 BuilderStep::DeclarativeRoutingSlip { .. } => "declarative_routing_slip",
972 BuilderStep::RecipientList { .. } => "recipient_list",
973 BuilderStep::DeclarativeRecipientList { .. } => "declarative_recipient_list",
974 }
975}
976
977impl StepAccumulator for RouteBuilder {
978 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
979 &mut self.steps
980 }
981}
982
983pub struct SplitBuilder {
991 parent: RouteBuilder,
992 config: SplitterConfig,
993 steps: Vec<BuilderStep>,
994}
995
996impl SplitBuilder {
997 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
999 where
1000 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1001 {
1002 FilterInSplitBuilder {
1003 parent: self,
1004 predicate: std::sync::Arc::new(predicate),
1005 steps: vec![],
1006 }
1007 }
1008
1009 pub fn end_split(mut self) -> RouteBuilder {
1012 let split_step = BuilderStep::Split {
1013 config: self.config,
1014 steps: self.steps,
1015 };
1016 self.parent.steps.push(split_step);
1017 self.parent
1018 }
1019}
1020
1021impl StepAccumulator for SplitBuilder {
1022 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1023 &mut self.steps
1024 }
1025}
1026
1027pub struct FilterBuilder {
1029 parent: RouteBuilder,
1030 predicate: FilterPredicate,
1031 steps: Vec<BuilderStep>,
1032}
1033
1034impl FilterBuilder {
1035 pub fn end_filter(mut self) -> RouteBuilder {
1038 let step = BuilderStep::Filter {
1039 predicate: self.predicate,
1040 steps: self.steps,
1041 };
1042 self.parent.steps.push(step);
1043 self.parent
1044 }
1045}
1046
1047impl StepAccumulator for FilterBuilder {
1048 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1049 &mut self.steps
1050 }
1051}
1052
1053pub struct FilterInSplitBuilder {
1055 parent: SplitBuilder,
1056 predicate: FilterPredicate,
1057 steps: Vec<BuilderStep>,
1058}
1059
1060impl FilterInSplitBuilder {
1061 pub fn end_filter(mut self) -> SplitBuilder {
1063 let step = BuilderStep::Filter {
1064 predicate: self.predicate,
1065 steps: self.steps,
1066 };
1067 self.parent.steps.push(step);
1068 self.parent
1069 }
1070}
1071
1072impl StepAccumulator for FilterInSplitBuilder {
1073 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1074 &mut self.steps
1075 }
1076}
1077
1078pub struct ChoiceBuilder {
1085 parent: RouteBuilder,
1086 whens: Vec<WhenStep>,
1087 _otherwise: Option<Vec<BuilderStep>>,
1088}
1089
1090impl ChoiceBuilder {
1091 pub fn when<F>(self, predicate: F) -> WhenBuilder
1094 where
1095 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1096 {
1097 WhenBuilder {
1098 parent: self,
1099 predicate: std::sync::Arc::new(predicate),
1100 steps: vec![],
1101 }
1102 }
1103
1104 pub fn otherwise(self) -> OtherwiseBuilder {
1108 OtherwiseBuilder {
1109 parent: self,
1110 steps: vec![],
1111 }
1112 }
1113
1114 pub fn end_choice(mut self) -> RouteBuilder {
1118 let step = BuilderStep::Choice {
1119 whens: self.whens,
1120 otherwise: self._otherwise,
1121 };
1122 self.parent.steps.push(step);
1123 self.parent
1124 }
1125}
1126
1127pub struct WhenBuilder {
1129 parent: ChoiceBuilder,
1130 predicate: camel_api::FilterPredicate,
1131 steps: Vec<BuilderStep>,
1132}
1133
1134impl WhenBuilder {
1135 pub fn end_when(mut self) -> ChoiceBuilder {
1138 self.parent.whens.push(WhenStep {
1139 predicate: self.predicate,
1140 steps: self.steps,
1141 });
1142 self.parent
1143 }
1144}
1145
1146impl StepAccumulator for WhenBuilder {
1147 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1148 &mut self.steps
1149 }
1150}
1151
1152pub struct OtherwiseBuilder {
1154 parent: ChoiceBuilder,
1155 steps: Vec<BuilderStep>,
1156}
1157
1158impl OtherwiseBuilder {
1159 pub fn end_otherwise(self) -> ChoiceBuilder {
1161 let OtherwiseBuilder { mut parent, steps } = self;
1162 parent._otherwise = Some(steps);
1163 parent
1164 }
1165}
1166
1167impl StepAccumulator for OtherwiseBuilder {
1168 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1169 &mut self.steps
1170 }
1171}
1172
1173pub struct MulticastBuilder {
1181 parent: RouteBuilder,
1182 steps: Vec<BuilderStep>,
1183 config: MulticastConfig,
1184}
1185
1186impl MulticastBuilder {
1187 pub fn parallel(mut self, parallel: bool) -> Self {
1188 self.config = self.config.parallel(parallel);
1189 self
1190 }
1191
1192 pub fn parallel_limit(mut self, limit: usize) -> Self {
1193 self.config = self.config.parallel_limit(limit);
1194 self
1195 }
1196
1197 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1198 self.config = self.config.stop_on_exception(stop);
1199 self
1200 }
1201
1202 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1203 self.config = self.config.timeout(duration);
1204 self
1205 }
1206
1207 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1208 self.config = self.config.aggregation(strategy);
1209 self
1210 }
1211
1212 pub fn end_multicast(mut self) -> RouteBuilder {
1213 let step = BuilderStep::Multicast {
1214 steps: self.steps,
1215 config: self.config,
1216 };
1217 self.parent.steps.push(step);
1218 self.parent
1219 }
1220}
1221
1222impl StepAccumulator for MulticastBuilder {
1223 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1224 &mut self.steps
1225 }
1226}
1227
1228pub struct ThrottleBuilder {
1236 parent: RouteBuilder,
1237 config: ThrottlerConfig,
1238 steps: Vec<BuilderStep>,
1239}
1240
1241impl ThrottleBuilder {
1242 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1248 self.config = self.config.strategy(strategy);
1249 self
1250 }
1251
1252 pub fn end_throttle(mut self) -> RouteBuilder {
1255 let step = BuilderStep::Throttle {
1256 config: self.config,
1257 steps: self.steps,
1258 };
1259 self.parent.steps.push(step);
1260 self.parent
1261 }
1262}
1263
1264impl StepAccumulator for ThrottleBuilder {
1265 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1266 &mut self.steps
1267 }
1268}
1269
1270pub struct LoopBuilder {
1272 parent: RouteBuilder,
1273 config: LoopConfig,
1274 steps: Vec<BuilderStep>,
1275}
1276
1277impl LoopBuilder {
1278 pub fn loop_count(self, count: usize) -> LoopInLoopBuilder {
1279 LoopInLoopBuilder {
1280 parent: self,
1281 config: LoopConfig {
1282 mode: LoopMode::Count(count),
1283 },
1284 steps: vec![],
1285 }
1286 }
1287
1288 pub fn loop_while<F>(self, predicate: F) -> LoopInLoopBuilder
1289 where
1290 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
1291 {
1292 LoopInLoopBuilder {
1293 parent: self,
1294 config: LoopConfig {
1295 mode: LoopMode::While(std::sync::Arc::new(predicate)),
1296 },
1297 steps: vec![],
1298 }
1299 }
1300
1301 pub fn end_loop(mut self) -> RouteBuilder {
1302 let step = BuilderStep::Loop {
1303 config: self.config,
1304 steps: self.steps,
1305 };
1306 self.parent.steps.push(step);
1307 self.parent
1308 }
1309}
1310
1311impl StepAccumulator for LoopBuilder {
1312 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1313 &mut self.steps
1314 }
1315}
1316
1317pub struct LoopInLoopBuilder {
1318 parent: LoopBuilder,
1319 config: LoopConfig,
1320 steps: Vec<BuilderStep>,
1321}
1322
1323impl LoopInLoopBuilder {
1324 pub fn end_loop(mut self) -> LoopBuilder {
1325 let step = BuilderStep::Loop {
1326 config: self.config,
1327 steps: self.steps,
1328 };
1329 self.parent.steps.push(step);
1330 self.parent
1331 }
1332}
1333
1334impl StepAccumulator for LoopInLoopBuilder {
1335 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1336 &mut self.steps
1337 }
1338}
1339
1340pub struct LoadBalancerBuilder {
1348 parent: RouteBuilder,
1349 config: LoadBalancerConfig,
1350 steps: Vec<BuilderStep>,
1351}
1352
1353impl LoadBalancerBuilder {
1354 pub fn round_robin(mut self) -> Self {
1356 self.config = LoadBalancerConfig::round_robin();
1357 self
1358 }
1359
1360 pub fn random(mut self) -> Self {
1362 self.config = LoadBalancerConfig::random();
1363 self
1364 }
1365
1366 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1371 self.config = LoadBalancerConfig::weighted(weights);
1372 self
1373 }
1374
1375 pub fn failover(mut self) -> Self {
1380 self.config = LoadBalancerConfig::failover();
1381 self
1382 }
1383
1384 pub fn parallel(mut self, parallel: bool) -> Self {
1389 self.config = self.config.parallel(parallel);
1390 self
1391 }
1392
1393 pub fn end_load_balance(mut self) -> RouteBuilder {
1396 let step = BuilderStep::LoadBalance {
1397 config: self.config,
1398 steps: self.steps,
1399 };
1400 self.parent.steps.push(step);
1401 self.parent
1402 }
1403}
1404
1405impl StepAccumulator for LoadBalancerBuilder {
1406 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1407 &mut self.steps
1408 }
1409}
1410
1411#[cfg(test)]
1416mod tests {
1417 use super::*;
1418 use camel_api::error_handler::ErrorHandlerConfig;
1419 use camel_api::load_balancer::LoadBalanceStrategy;
1420 use camel_api::{Exchange, Message};
1421 use camel_core::route::BuilderStep;
1422 use std::sync::Arc;
1423 use std::time::Duration;
1424 use tower::{Service, ServiceExt};
1425
1426 #[test]
1427 fn test_builder_from_creates_definition() {
1428 let definition = RouteBuilder::from("timer:tick")
1429 .route_id("test-route")
1430 .build()
1431 .unwrap();
1432 assert_eq!(definition.from_uri(), "timer:tick");
1433 }
1434
1435 #[test]
1436 fn test_builder_empty_from_uri_errors() {
1437 let result = RouteBuilder::from("").route_id("test-route").build();
1438 assert!(result.is_err());
1439 }
1440
1441 #[test]
1442 fn test_builder_to_adds_step() {
1443 let definition = RouteBuilder::from("timer:tick")
1444 .route_id("test-route")
1445 .to("log:info")
1446 .build()
1447 .unwrap();
1448
1449 assert_eq!(definition.from_uri(), "timer:tick");
1450 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1452 }
1453
1454 #[test]
1455 fn test_builder_filter_adds_filter_step() {
1456 let definition = RouteBuilder::from("timer:tick")
1457 .route_id("test-route")
1458 .filter(|_ex| true)
1459 .to("mock:result")
1460 .end_filter()
1461 .build()
1462 .unwrap();
1463
1464 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1465 }
1466
1467 #[test]
1468 fn test_builder_set_header_adds_processor_step() {
1469 let definition = RouteBuilder::from("timer:tick")
1470 .route_id("test-route")
1471 .set_header("key", Value::String("value".into()))
1472 .build()
1473 .unwrap();
1474
1475 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1476 }
1477
1478 #[test]
1479 fn test_builder_map_body_adds_processor_step() {
1480 let definition = RouteBuilder::from("timer:tick")
1481 .route_id("test-route")
1482 .map_body(|body| body)
1483 .build()
1484 .unwrap();
1485
1486 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1487 }
1488
1489 #[test]
1490 fn test_builder_process_adds_processor_step() {
1491 let definition = RouteBuilder::from("timer:tick")
1492 .route_id("test-route")
1493 .process(|ex| async move { Ok(ex) })
1494 .build()
1495 .unwrap();
1496
1497 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1498 }
1499
1500 #[test]
1501 fn test_builder_chain_multiple_steps() {
1502 let definition = RouteBuilder::from("timer:tick")
1503 .route_id("test-route")
1504 .set_header("source", Value::String("timer".into()))
1505 .filter(|ex| ex.input.header("source").is_some())
1506 .to("log:info")
1507 .end_filter()
1508 .to("mock:result")
1509 .build()
1510 .unwrap();
1511
1512 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1516 }
1517
1518 #[test]
1519 fn test_loop_count_builder() {
1520 use camel_api::loop_eip::LoopMode;
1521
1522 let def = RouteBuilder::from("direct:start")
1523 .route_id("loop-test")
1524 .loop_count(3)
1525 .to("mock:inside")
1526 .end_loop()
1527 .to("mock:after")
1528 .build()
1529 .unwrap();
1530
1531 assert_eq!(def.steps().len(), 2);
1532 match &def.steps()[0] {
1533 BuilderStep::Loop { config, steps } => {
1534 assert!(matches!(config.mode, LoopMode::Count(3)));
1535 assert_eq!(steps.len(), 1);
1536 }
1537 other => panic!("Expected Loop, got {:?}", other),
1538 }
1539 assert!(matches!(def.steps()[1], BuilderStep::To(_)));
1540 }
1541
1542 #[test]
1543 fn test_loop_while_builder() {
1544 use camel_api::loop_eip::LoopMode;
1545
1546 let def = RouteBuilder::from("direct:start")
1547 .route_id("loop-while-test")
1548 .loop_while(|_ex| true)
1549 .to("mock:retry")
1550 .end_loop()
1551 .build()
1552 .unwrap();
1553
1554 assert_eq!(def.steps().len(), 1);
1555 match &def.steps()[0] {
1556 BuilderStep::Loop { config, steps } => {
1557 assert!(matches!(config.mode, LoopMode::While(_)));
1558 assert_eq!(steps.len(), 1);
1559 }
1560 other => panic!("Expected Loop, got {:?}", other),
1561 }
1562 }
1563
1564 #[test]
1565 fn test_nested_loop_builder() {
1566 use camel_api::loop_eip::LoopMode;
1567
1568 let def = RouteBuilder::from("direct:start")
1569 .route_id("nested-loop-test")
1570 .loop_count(2)
1571 .to("mock:outer")
1572 .loop_count(3)
1573 .to("mock:inner")
1574 .end_loop()
1575 .end_loop()
1576 .to("mock:after")
1577 .build()
1578 .unwrap();
1579
1580 assert_eq!(def.steps().len(), 2);
1581 match &def.steps()[0] {
1582 BuilderStep::Loop { steps, .. } => {
1583 assert_eq!(steps.len(), 2);
1584 match &steps[1] {
1585 BuilderStep::Loop {
1586 config,
1587 steps: inner_steps,
1588 } => {
1589 assert!(matches!(config.mode, LoopMode::Count(3)));
1590 assert_eq!(inner_steps.len(), 1);
1591 }
1592 other => panic!("Expected nested Loop, got {:?}", other),
1593 }
1594 }
1595 other => panic!("Expected outer Loop, got {:?}", other),
1596 }
1597 }
1598
1599 #[tokio::test]
1604 async fn test_set_header_processor_works() {
1605 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1606 let exchange = Exchange::new(Message::new("test"));
1607 let result = svc.call(exchange).await.unwrap();
1608 assert_eq!(
1609 result.input.header("greeting"),
1610 Some(&Value::String("hello".into()))
1611 );
1612 }
1613
1614 #[tokio::test]
1615 async fn test_filter_processor_passes() {
1616 use camel_api::BoxProcessorExt;
1617 use camel_processor::FilterService;
1618
1619 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1620 let mut svc =
1621 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1622 let exchange = Exchange::new(Message::new("pass"));
1623 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1624 assert_eq!(result.input.body.as_text(), Some("pass"));
1625 }
1626
1627 #[tokio::test]
1628 async fn test_filter_processor_blocks() {
1629 use camel_api::BoxProcessorExt;
1630 use camel_processor::FilterService;
1631
1632 let sub = BoxProcessor::from_fn(|_ex| {
1633 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1634 });
1635 let mut svc =
1636 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1637 let exchange = Exchange::new(Message::new("reject"));
1638 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1639 assert_eq!(result.input.body.as_text(), Some("reject"));
1640 }
1641
1642 #[tokio::test]
1643 async fn test_map_body_processor_works() {
1644 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1645 if let Some(text) = body.as_text() {
1646 Body::Text(text.to_uppercase())
1647 } else {
1648 body
1649 }
1650 });
1651 let exchange = Exchange::new(Message::new("hello"));
1652 let result = mapper.oneshot(exchange).await.unwrap();
1653 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1654 }
1655
1656 #[tokio::test]
1657 async fn test_process_custom_processor_works() {
1658 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1659 ex.set_property("custom", Value::Bool(true));
1660 Ok(ex)
1661 });
1662 let exchange = Exchange::new(Message::default());
1663 let result = processor.oneshot(exchange).await.unwrap();
1664 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1665 }
1666
1667 #[tokio::test]
1672 async fn test_compose_pipeline_runs_steps_in_order() {
1673 use camel_core::route::compose_pipeline;
1674
1675 let processors = vec![
1676 BoxProcessor::new(SetHeader::new(
1677 IdentityProcessor,
1678 "step",
1679 Value::String("one".into()),
1680 )),
1681 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1682 if let Some(text) = body.as_text() {
1683 Body::Text(format!("{}-processed", text))
1684 } else {
1685 body
1686 }
1687 })),
1688 ];
1689
1690 let pipeline = compose_pipeline(processors);
1691 let exchange = Exchange::new(Message::new("hello"));
1692 let result = pipeline.oneshot(exchange).await.unwrap();
1693
1694 assert_eq!(
1695 result.input.header("step"),
1696 Some(&Value::String("one".into()))
1697 );
1698 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1699 }
1700
1701 #[tokio::test]
1702 async fn test_compose_pipeline_empty_is_identity() {
1703 use camel_core::route::compose_pipeline;
1704
1705 let pipeline = compose_pipeline(vec![]);
1706 let exchange = Exchange::new(Message::new("unchanged"));
1707 let result = pipeline.oneshot(exchange).await.unwrap();
1708 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1709 }
1710
1711 #[test]
1716 fn test_builder_circuit_breaker_sets_config() {
1717 use camel_api::circuit_breaker::CircuitBreakerConfig;
1718
1719 let config = CircuitBreakerConfig::new().failure_threshold(5);
1720 let definition = RouteBuilder::from("timer:tick")
1721 .route_id("test-route")
1722 .circuit_breaker(config)
1723 .build()
1724 .unwrap();
1725
1726 let cb = definition
1727 .circuit_breaker_config()
1728 .expect("circuit breaker should be set");
1729 assert_eq!(cb.failure_threshold, 5);
1730 }
1731
1732 #[test]
1733 fn test_builder_circuit_breaker_with_error_handler() {
1734 use camel_api::circuit_breaker::CircuitBreakerConfig;
1735 use camel_api::error_handler::ErrorHandlerConfig;
1736
1737 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1738 let eh_config = ErrorHandlerConfig::log_only();
1739
1740 let definition = RouteBuilder::from("timer:tick")
1741 .route_id("test-route")
1742 .to("log:info")
1743 .circuit_breaker(cb_config)
1744 .error_handler(eh_config)
1745 .build()
1746 .unwrap();
1747
1748 assert!(
1749 definition.circuit_breaker_config().is_some(),
1750 "circuit breaker config should be set"
1751 );
1752 }
1754
1755 #[test]
1756 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1757 let definition = RouteBuilder::from("direct:start")
1758 .route_id("test-route")
1759 .dead_letter_channel("log:dlc")
1760 .on_exception(|e| matches!(e, CamelError::Io(_)))
1761 .retry(3)
1762 .handled_by("log:io")
1763 .end_on_exception()
1764 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1765 .retry(1)
1766 .end_on_exception()
1767 .to("mock:out")
1768 .build()
1769 .expect("route should build");
1770
1771 let cfg = definition
1772 .error_handler_config()
1773 .expect("error handler should be set");
1774 assert_eq!(cfg.policies.len(), 2);
1775 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1776 assert_eq!(
1777 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1778 Some(3)
1779 );
1780 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1781 assert_eq!(
1782 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1783 Some(1)
1784 );
1785 }
1786
1787 #[test]
1788 fn test_builder_on_exception_mixed_mode_rejected() {
1789 let result = RouteBuilder::from("direct:start")
1790 .route_id("test-route")
1791 .error_handler(ErrorHandlerConfig::log_only())
1792 .on_exception(|_e| true)
1793 .end_on_exception()
1794 .to("mock:out")
1795 .build();
1796
1797 let err = result.err().expect("mixed mode should fail with an error");
1798
1799 assert!(
1800 format!("{err}").contains("mixed error handler modes"),
1801 "unexpected error: {err}"
1802 );
1803 }
1804
1805 #[test]
1806 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1807 let definition = RouteBuilder::from("direct:start")
1808 .route_id("test-route")
1809 .on_exception(|_e| true)
1810 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1811 .with_jitter(0.5)
1812 .end_on_exception()
1813 .to("mock:out")
1814 .build()
1815 .expect("route should build");
1816
1817 let cfg = definition
1818 .error_handler_config()
1819 .expect("error handler should be set");
1820 assert_eq!(cfg.policies.len(), 1);
1821 assert!(cfg.policies[0].retry.is_none());
1822 }
1823
1824 #[test]
1825 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1826 let definition = RouteBuilder::from("direct:start")
1827 .route_id("test-route")
1828 .dead_letter_channel("log:dlc")
1829 .to("mock:out")
1830 .build()
1831 .expect("route should build");
1832
1833 let cfg = definition
1834 .error_handler_config()
1835 .expect("error handler should be set");
1836 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1837 assert!(cfg.policies.is_empty());
1838 }
1839
1840 #[test]
1841 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1842 let definition = RouteBuilder::from("direct:start")
1843 .route_id("test-route")
1844 .dead_letter_channel("log:first")
1845 .on_exception(|e| matches!(e, CamelError::Io(_)))
1846 .retry(2)
1847 .end_on_exception()
1848 .dead_letter_channel("log:second")
1849 .to("mock:out")
1850 .build()
1851 .expect("route should build");
1852
1853 let cfg = definition
1854 .error_handler_config()
1855 .expect("error handler should be set");
1856 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1857 assert_eq!(cfg.policies.len(), 1);
1858 assert_eq!(
1859 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1860 Some(2)
1861 );
1862 }
1863
1864 #[test]
1865 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1866 let definition = RouteBuilder::from("direct:start")
1867 .route_id("test-route")
1868 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1869 .retry(1)
1870 .end_on_exception()
1871 .to("mock:out")
1872 .build()
1873 .expect("route should build");
1874
1875 let cfg = definition
1876 .error_handler_config()
1877 .expect("error handler should be set");
1878 assert!(cfg.dlc_uri.is_none());
1879 assert_eq!(cfg.policies.len(), 1);
1880 }
1881
1882 #[test]
1883 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1884 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1885 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1886
1887 let definition = RouteBuilder::from("direct:start")
1888 .route_id("test-route")
1889 .error_handler(first)
1890 .error_handler(second)
1891 .to("mock:out")
1892 .build()
1893 .expect("route should build");
1894
1895 let cfg = definition
1896 .error_handler_config()
1897 .expect("error handler should be set");
1898 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1899 }
1900
1901 #[test]
1904 fn test_split_builder_typestate() {
1905 use camel_api::splitter::{SplitterConfig, split_body_lines};
1906
1907 let definition = RouteBuilder::from("timer:test?period=1000")
1909 .route_id("test-route")
1910 .split(SplitterConfig::new(split_body_lines()))
1911 .to("mock:per-fragment")
1912 .end_split()
1913 .to("mock:final")
1914 .build()
1915 .unwrap();
1916
1917 assert_eq!(definition.steps().len(), 2);
1919 }
1920
1921 #[test]
1922 fn test_split_builder_steps_collected() {
1923 use camel_api::splitter::{SplitterConfig, split_body_lines};
1924
1925 let definition = RouteBuilder::from("timer:test?period=1000")
1926 .route_id("test-route")
1927 .split(SplitterConfig::new(split_body_lines()))
1928 .set_header("fragment", Value::String("yes".into()))
1929 .to("mock:per-fragment")
1930 .end_split()
1931 .build()
1932 .unwrap();
1933
1934 assert_eq!(definition.steps().len(), 1);
1936 match &definition.steps()[0] {
1937 BuilderStep::Split { steps, .. } => {
1938 assert_eq!(steps.len(), 2); }
1940 other => panic!("Expected Split, got {:?}", other),
1941 }
1942 }
1943
1944 #[test]
1945 fn test_split_builder_config_propagated() {
1946 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1947
1948 let definition = RouteBuilder::from("timer:test?period=1000")
1949 .route_id("test-route")
1950 .split(
1951 SplitterConfig::new(split_body_lines())
1952 .parallel(true)
1953 .parallel_limit(4)
1954 .aggregation(AggregationStrategy::CollectAll),
1955 )
1956 .to("mock:per-fragment")
1957 .end_split()
1958 .build()
1959 .unwrap();
1960
1961 match &definition.steps()[0] {
1962 BuilderStep::Split { config, .. } => {
1963 assert!(config.parallel);
1964 assert_eq!(config.parallel_limit, Some(4));
1965 assert!(matches!(
1966 config.aggregation,
1967 AggregationStrategy::CollectAll
1968 ));
1969 }
1970 other => panic!("Expected Split, got {:?}", other),
1971 }
1972 }
1973
1974 #[test]
1975 fn test_aggregate_builder_adds_step() {
1976 use camel_api::aggregator::AggregatorConfig;
1977 use camel_core::route::BuilderStep;
1978
1979 let definition = RouteBuilder::from("timer:tick")
1980 .route_id("test-route")
1981 .aggregate(
1982 AggregatorConfig::correlate_by("key")
1983 .complete_when_size(2)
1984 .build(),
1985 )
1986 .build()
1987 .unwrap();
1988
1989 assert_eq!(definition.steps().len(), 1);
1990 assert!(matches!(
1991 definition.steps()[0],
1992 BuilderStep::Aggregate { .. }
1993 ));
1994 }
1995
1996 #[test]
1997 fn test_aggregate_in_split_builder() {
1998 use camel_api::aggregator::AggregatorConfig;
1999 use camel_api::splitter::{SplitterConfig, split_body_lines};
2000 use camel_core::route::BuilderStep;
2001
2002 let definition = RouteBuilder::from("timer:tick")
2003 .route_id("test-route")
2004 .split(SplitterConfig::new(split_body_lines()))
2005 .aggregate(
2006 AggregatorConfig::correlate_by("key")
2007 .complete_when_size(1)
2008 .build(),
2009 )
2010 .end_split()
2011 .build()
2012 .unwrap();
2013
2014 assert_eq!(definition.steps().len(), 1);
2015 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
2016 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
2017 } else {
2018 panic!("expected Split step");
2019 }
2020 }
2021
2022 #[test]
2025 fn test_builder_set_body_static_adds_processor() {
2026 let definition = RouteBuilder::from("timer:tick")
2027 .route_id("test-route")
2028 .set_body("fixed")
2029 .build()
2030 .unwrap();
2031 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2032 }
2033
2034 #[test]
2035 fn test_builder_set_body_fn_adds_processor() {
2036 let definition = RouteBuilder::from("timer:tick")
2037 .route_id("test-route")
2038 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
2039 .build()
2040 .unwrap();
2041 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2042 }
2043
2044 #[test]
2045 fn transform_alias_produces_same_as_set_body() {
2046 let route_transform = RouteBuilder::from("timer:tick")
2047 .route_id("test-route")
2048 .transform("hello")
2049 .build()
2050 .unwrap();
2051
2052 let route_set_body = RouteBuilder::from("timer:tick")
2053 .route_id("test-route")
2054 .set_body("hello")
2055 .build()
2056 .unwrap();
2057
2058 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
2059 }
2060
2061 #[test]
2062 fn test_builder_set_header_fn_adds_processor() {
2063 let definition = RouteBuilder::from("timer:tick")
2064 .route_id("test-route")
2065 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
2066 .build()
2067 .unwrap();
2068 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2069 }
2070
2071 #[tokio::test]
2072 async fn test_set_body_static_processor_works() {
2073 use camel_core::route::compose_pipeline;
2074 let def = RouteBuilder::from("t:t")
2075 .route_id("test-route")
2076 .set_body("replaced")
2077 .build()
2078 .unwrap();
2079 let pipeline = compose_pipeline(
2080 def.steps()
2081 .iter()
2082 .filter_map(|s| {
2083 if let BuilderStep::Processor(p) = s {
2084 Some(p.clone())
2085 } else {
2086 None
2087 }
2088 })
2089 .collect(),
2090 );
2091 let exchange = Exchange::new(Message::new("original"));
2092 let result = pipeline.oneshot(exchange).await.unwrap();
2093 assert_eq!(result.input.body.as_text(), Some("replaced"));
2094 }
2095
2096 #[tokio::test]
2097 async fn test_set_body_fn_processor_works() {
2098 use camel_core::route::compose_pipeline;
2099 let def = RouteBuilder::from("t:t")
2100 .route_id("test-route")
2101 .set_body_fn(|ex: &Exchange| {
2102 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
2103 })
2104 .build()
2105 .unwrap();
2106 let pipeline = compose_pipeline(
2107 def.steps()
2108 .iter()
2109 .filter_map(|s| {
2110 if let BuilderStep::Processor(p) = s {
2111 Some(p.clone())
2112 } else {
2113 None
2114 }
2115 })
2116 .collect(),
2117 );
2118 let exchange = Exchange::new(Message::new("hello"));
2119 let result = pipeline.oneshot(exchange).await.unwrap();
2120 assert_eq!(result.input.body.as_text(), Some("HELLO"));
2121 }
2122
2123 #[tokio::test]
2124 async fn test_set_header_fn_processor_works() {
2125 use camel_core::route::compose_pipeline;
2126 let def = RouteBuilder::from("t:t")
2127 .route_id("test-route")
2128 .set_header_fn("echo", |ex: &Exchange| {
2129 ex.input
2130 .body
2131 .as_text()
2132 .map(|t| Value::String(t.into()))
2133 .unwrap_or(Value::Null)
2134 })
2135 .build()
2136 .unwrap();
2137 let pipeline = compose_pipeline(
2138 def.steps()
2139 .iter()
2140 .filter_map(|s| {
2141 if let BuilderStep::Processor(p) = s {
2142 Some(p.clone())
2143 } else {
2144 None
2145 }
2146 })
2147 .collect(),
2148 );
2149 let exchange = Exchange::new(Message::new("ping"));
2150 let result = pipeline.oneshot(exchange).await.unwrap();
2151 assert_eq!(
2152 result.input.header("echo"),
2153 Some(&Value::String("ping".into()))
2154 );
2155 }
2156
2157 #[test]
2160 fn test_filter_builder_typestate() {
2161 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2162 .route_id("test-route")
2163 .filter(|_ex| true)
2164 .to("mock:inner")
2165 .end_filter()
2166 .to("mock:outer")
2167 .build();
2168 assert!(result.is_ok());
2169 }
2170
2171 #[test]
2172 fn test_filter_builder_steps_collected() {
2173 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
2174 .route_id("test-route")
2175 .filter(|_ex| true)
2176 .to("mock:inner")
2177 .end_filter()
2178 .build()
2179 .unwrap();
2180
2181 assert_eq!(definition.steps().len(), 1);
2182 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
2183 }
2184
2185 #[test]
2186 fn test_wire_tap_builder_adds_step() {
2187 let definition = RouteBuilder::from("timer:tick")
2188 .route_id("test-route")
2189 .wire_tap("mock:tap")
2190 .to("mock:result")
2191 .build()
2192 .unwrap();
2193
2194 assert_eq!(definition.steps().len(), 2);
2195 assert!(
2196 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
2197 );
2198 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
2199 }
2200
2201 #[test]
2204 fn test_multicast_builder_typestate() {
2205 let definition = RouteBuilder::from("timer:tick")
2206 .route_id("test-route")
2207 .multicast()
2208 .to("direct:a")
2209 .to("direct:b")
2210 .end_multicast()
2211 .to("mock:result")
2212 .build()
2213 .unwrap();
2214
2215 assert_eq!(definition.steps().len(), 2); }
2217
2218 #[test]
2219 fn test_multicast_builder_steps_collected() {
2220 let definition = RouteBuilder::from("timer:tick")
2221 .route_id("test-route")
2222 .multicast()
2223 .to("direct:a")
2224 .to("direct:b")
2225 .end_multicast()
2226 .build()
2227 .unwrap();
2228
2229 match &definition.steps()[0] {
2230 BuilderStep::Multicast { steps, .. } => {
2231 assert_eq!(steps.len(), 2);
2232 }
2233 other => panic!("Expected Multicast, got {:?}", other),
2234 }
2235 }
2236
2237 #[test]
2240 fn test_builder_concurrent_sets_concurrency() {
2241 use camel_component_api::ConcurrencyModel;
2242
2243 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2244 .route_id("test-route")
2245 .concurrent(16)
2246 .to("log:info")
2247 .build()
2248 .unwrap();
2249
2250 assert_eq!(
2251 definition.concurrency_override(),
2252 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2253 );
2254 }
2255
2256 #[test]
2257 fn test_builder_concurrent_zero_means_unbounded() {
2258 use camel_component_api::ConcurrencyModel;
2259
2260 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2261 .route_id("test-route")
2262 .concurrent(0)
2263 .to("log:info")
2264 .build()
2265 .unwrap();
2266
2267 assert_eq!(
2268 definition.concurrency_override(),
2269 Some(&ConcurrencyModel::Concurrent { max: None })
2270 );
2271 }
2272
2273 #[test]
2274 fn test_builder_sequential_sets_concurrency() {
2275 use camel_component_api::ConcurrencyModel;
2276
2277 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2278 .route_id("test-route")
2279 .sequential()
2280 .to("log:info")
2281 .build()
2282 .unwrap();
2283
2284 assert_eq!(
2285 definition.concurrency_override(),
2286 Some(&ConcurrencyModel::Sequential)
2287 );
2288 }
2289
2290 #[test]
2291 fn test_builder_default_concurrency_is_none() {
2292 let definition = RouteBuilder::from("timer:tick")
2293 .route_id("test-route")
2294 .to("log:info")
2295 .build()
2296 .unwrap();
2297
2298 assert_eq!(definition.concurrency_override(), None);
2299 }
2300
2301 #[test]
2304 fn test_builder_route_id_sets_id() {
2305 let definition = RouteBuilder::from("timer:tick")
2306 .route_id("my-route")
2307 .build()
2308 .unwrap();
2309
2310 assert_eq!(definition.route_id(), "my-route");
2311 }
2312
2313 #[test]
2314 fn test_build_without_route_id_fails() {
2315 let result = RouteBuilder::from("timer:tick?period=1000")
2316 .to("log:info")
2317 .build();
2318 let err = match result {
2319 Err(e) => e.to_string(),
2320 Ok(_) => panic!("build() should fail without route_id"),
2321 };
2322 assert!(
2323 err.contains("route_id"),
2324 "error should mention route_id, got: {}",
2325 err
2326 );
2327 }
2328
2329 #[test]
2330 fn test_builder_auto_startup_false() {
2331 let definition = RouteBuilder::from("timer:tick")
2332 .route_id("test-route")
2333 .auto_startup(false)
2334 .build()
2335 .unwrap();
2336
2337 assert!(!definition.auto_startup());
2338 }
2339
2340 #[test]
2341 fn test_builder_startup_order_custom() {
2342 let definition = RouteBuilder::from("timer:tick")
2343 .route_id("test-route")
2344 .startup_order(50)
2345 .build()
2346 .unwrap();
2347
2348 assert_eq!(definition.startup_order(), 50);
2349 }
2350
2351 #[test]
2352 fn test_builder_defaults() {
2353 let definition = RouteBuilder::from("timer:tick")
2354 .route_id("test-route")
2355 .build()
2356 .unwrap();
2357
2358 assert_eq!(definition.route_id(), "test-route");
2359 assert!(definition.auto_startup());
2360 assert_eq!(definition.startup_order(), 1000);
2361 }
2362
2363 #[test]
2366 fn test_choice_builder_single_when() {
2367 let definition = RouteBuilder::from("timer:tick")
2368 .route_id("test-route")
2369 .choice()
2370 .when(|ex: &Exchange| ex.input.header("type").is_some())
2371 .to("mock:typed")
2372 .end_when()
2373 .end_choice()
2374 .build()
2375 .unwrap();
2376 assert_eq!(definition.steps().len(), 1);
2377 assert!(
2378 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2379 if whens.len() == 1 && otherwise.is_none())
2380 );
2381 }
2382
2383 #[test]
2384 fn test_choice_builder_when_otherwise() {
2385 let definition = RouteBuilder::from("timer:tick")
2386 .route_id("test-route")
2387 .choice()
2388 .when(|ex: &Exchange| ex.input.header("a").is_some())
2389 .to("mock:a")
2390 .end_when()
2391 .otherwise()
2392 .to("mock:fallback")
2393 .end_otherwise()
2394 .end_choice()
2395 .build()
2396 .unwrap();
2397 assert!(
2398 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2399 if whens.len() == 1 && otherwise.is_some())
2400 );
2401 }
2402
2403 #[test]
2404 fn test_choice_builder_multiple_whens() {
2405 let definition = RouteBuilder::from("timer:tick")
2406 .route_id("test-route")
2407 .choice()
2408 .when(|ex: &Exchange| ex.input.header("a").is_some())
2409 .to("mock:a")
2410 .end_when()
2411 .when(|ex: &Exchange| ex.input.header("b").is_some())
2412 .to("mock:b")
2413 .end_when()
2414 .end_choice()
2415 .build()
2416 .unwrap();
2417 assert!(
2418 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2419 if whens.len() == 2)
2420 );
2421 }
2422
2423 #[test]
2424 fn test_choice_step_after_choice() {
2425 let definition = RouteBuilder::from("timer:tick")
2427 .route_id("test-route")
2428 .choice()
2429 .when(|_ex: &Exchange| true)
2430 .to("mock:inner")
2431 .end_when()
2432 .end_choice()
2433 .to("mock:outer") .build()
2435 .unwrap();
2436 assert_eq!(definition.steps().len(), 2);
2437 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2438 }
2439
2440 #[test]
2443 fn test_throttle_builder_typestate() {
2444 let definition = RouteBuilder::from("timer:tick")
2445 .route_id("test-route")
2446 .throttle(10, std::time::Duration::from_secs(1))
2447 .to("mock:result")
2448 .end_throttle()
2449 .build()
2450 .unwrap();
2451
2452 assert_eq!(definition.steps().len(), 1);
2453 assert!(matches!(
2454 &definition.steps()[0],
2455 BuilderStep::Throttle { .. }
2456 ));
2457 }
2458
2459 #[test]
2460 fn test_throttle_builder_with_strategy() {
2461 let definition = RouteBuilder::from("timer:tick")
2462 .route_id("test-route")
2463 .throttle(10, std::time::Duration::from_secs(1))
2464 .strategy(ThrottleStrategy::Reject)
2465 .to("mock:result")
2466 .end_throttle()
2467 .build()
2468 .unwrap();
2469
2470 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2471 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2472 } else {
2473 panic!("Expected Throttle step");
2474 }
2475 }
2476
2477 #[test]
2478 fn test_throttle_builder_steps_collected() {
2479 let definition = RouteBuilder::from("timer:tick")
2480 .route_id("test-route")
2481 .throttle(5, std::time::Duration::from_secs(1))
2482 .set_header("throttled", Value::Bool(true))
2483 .to("mock:throttled")
2484 .end_throttle()
2485 .build()
2486 .unwrap();
2487
2488 match &definition.steps()[0] {
2489 BuilderStep::Throttle { steps, .. } => {
2490 assert_eq!(steps.len(), 2); }
2492 other => panic!("Expected Throttle, got {:?}", other),
2493 }
2494 }
2495
2496 #[test]
2497 fn test_throttle_step_after_throttle() {
2498 let definition = RouteBuilder::from("timer:tick")
2500 .route_id("test-route")
2501 .throttle(10, std::time::Duration::from_secs(1))
2502 .to("mock:inner")
2503 .end_throttle()
2504 .to("mock:outer")
2505 .build()
2506 .unwrap();
2507
2508 assert_eq!(definition.steps().len(), 2);
2509 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2510 }
2511
2512 #[test]
2515 fn test_load_balance_builder_typestate() {
2516 let definition = RouteBuilder::from("timer:tick")
2517 .route_id("test-route")
2518 .load_balance()
2519 .round_robin()
2520 .to("mock:a")
2521 .to("mock:b")
2522 .end_load_balance()
2523 .build()
2524 .unwrap();
2525
2526 assert_eq!(definition.steps().len(), 1);
2527 assert!(matches!(
2528 &definition.steps()[0],
2529 BuilderStep::LoadBalance { .. }
2530 ));
2531 }
2532
2533 #[test]
2534 fn test_load_balance_builder_with_strategy() {
2535 let definition = RouteBuilder::from("timer:tick")
2536 .route_id("test-route")
2537 .load_balance()
2538 .random()
2539 .to("mock:result")
2540 .end_load_balance()
2541 .build()
2542 .unwrap();
2543
2544 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2545 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2546 } else {
2547 panic!("Expected LoadBalance step");
2548 }
2549 }
2550
2551 #[test]
2552 fn test_load_balance_builder_steps_collected() {
2553 let definition = RouteBuilder::from("timer:tick")
2554 .route_id("test-route")
2555 .load_balance()
2556 .set_header("lb", Value::Bool(true))
2557 .to("mock:a")
2558 .end_load_balance()
2559 .build()
2560 .unwrap();
2561
2562 match &definition.steps()[0] {
2563 BuilderStep::LoadBalance { steps, .. } => {
2564 assert_eq!(steps.len(), 2); }
2566 other => panic!("Expected LoadBalance, got {:?}", other),
2567 }
2568 }
2569
2570 #[test]
2571 fn test_load_balance_step_after_load_balance() {
2572 let definition = RouteBuilder::from("timer:tick")
2574 .route_id("test-route")
2575 .load_balance()
2576 .to("mock:inner")
2577 .end_load_balance()
2578 .to("mock:outer")
2579 .build()
2580 .unwrap();
2581
2582 assert_eq!(definition.steps().len(), 2);
2583 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2584 }
2585
2586 #[test]
2589 fn test_dynamic_router_builder() {
2590 let definition = RouteBuilder::from("timer:tick")
2591 .route_id("test-route")
2592 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2593 .build()
2594 .unwrap();
2595
2596 assert_eq!(definition.steps().len(), 1);
2597 assert!(matches!(
2598 &definition.steps()[0],
2599 BuilderStep::DynamicRouter { .. }
2600 ));
2601 }
2602
2603 #[test]
2604 fn test_dynamic_router_builder_with_config() {
2605 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2606 .max_iterations(100)
2607 .cache_size(500);
2608
2609 let definition = RouteBuilder::from("timer:tick")
2610 .route_id("test-route")
2611 .dynamic_router_with_config(config)
2612 .build()
2613 .unwrap();
2614
2615 assert_eq!(definition.steps().len(), 1);
2616 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2617 assert_eq!(config.max_iterations, 100);
2618 assert_eq!(config.cache_size, 500);
2619 } else {
2620 panic!("Expected DynamicRouter step");
2621 }
2622 }
2623
2624 #[test]
2625 fn test_dynamic_router_step_after_router() {
2626 let definition = RouteBuilder::from("timer:tick")
2628 .route_id("test-route")
2629 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2630 .to("mock:outer")
2631 .build()
2632 .unwrap();
2633
2634 assert_eq!(definition.steps().len(), 2);
2635 assert!(matches!(
2636 &definition.steps()[0],
2637 BuilderStep::DynamicRouter { .. }
2638 ));
2639 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2640 }
2641
2642 #[test]
2643 fn routing_slip_builder_creates_step() {
2644 use camel_api::RoutingSlipExpression;
2645
2646 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2647
2648 let route = RouteBuilder::from("direct:start")
2649 .route_id("routing-slip-test")
2650 .routing_slip(expression)
2651 .build()
2652 .unwrap();
2653
2654 assert!(
2655 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2656 "Expected RoutingSlip step"
2657 );
2658 }
2659
2660 #[test]
2661 fn routing_slip_with_config_builder_creates_step() {
2662 use camel_api::RoutingSlipConfig;
2663
2664 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2665 .uri_delimiter("|")
2666 .cache_size(50)
2667 .ignore_invalid_endpoints(true);
2668
2669 let route = RouteBuilder::from("direct:start")
2670 .route_id("routing-slip-config-test")
2671 .routing_slip_with_config(config)
2672 .build()
2673 .unwrap();
2674
2675 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2676 assert_eq!(config.uri_delimiter, "|");
2677 assert_eq!(config.cache_size, 50);
2678 assert!(config.ignore_invalid_endpoints);
2679 } else {
2680 panic!("Expected RoutingSlip step");
2681 }
2682 }
2683
2684 #[test]
2685 fn test_builder_marshal_adds_processor_step() {
2686 let definition = RouteBuilder::from("timer:tick")
2687 .route_id("test-route")
2688 .marshal("json")
2689 .build()
2690 .unwrap();
2691 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2692 }
2693
2694 #[test]
2695 fn test_builder_unmarshal_adds_processor_step() {
2696 let definition = RouteBuilder::from("timer:tick")
2697 .route_id("test-route")
2698 .unmarshal("json")
2699 .build()
2700 .unwrap();
2701 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2702 }
2703
2704 #[test]
2705 fn test_builder_stream_cache_adds_processor_step() {
2706 let definition = RouteBuilder::from("timer:tick")
2707 .route_id("test-route")
2708 .stream_cache(1024)
2709 .build()
2710 .unwrap();
2711 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2712 }
2713
2714 #[test]
2715 fn validate_adds_to_step_with_validator_uri() {
2716 let def = RouteBuilder::from("direct:in")
2717 .route_id("test")
2718 .validate("schemas/order.xsd")
2719 .build()
2720 .unwrap();
2721 let steps = def.steps();
2722 assert_eq!(steps.len(), 1);
2723 assert!(
2724 matches!(&steps[0], BuilderStep::To(uri) if uri == "validator:schemas/order.xsd"),
2725 "got: {:?}",
2726 steps[0]
2727 );
2728 }
2729
2730 #[test]
2731 #[should_panic(expected = "unknown data format: 'csv'")]
2732 fn test_builder_marshal_panics_on_unknown_format() {
2733 let _ = RouteBuilder::from("timer:tick")
2734 .route_id("test-route")
2735 .marshal("csv")
2736 .build();
2737 }
2738
2739 #[test]
2740 #[should_panic(expected = "unknown data format: 'csv'")]
2741 fn test_builder_unmarshal_panics_on_unknown_format() {
2742 let _ = RouteBuilder::from("timer:tick")
2743 .route_id("test-route")
2744 .unmarshal("csv")
2745 .build();
2746 }
2747}