1use camel_api::aggregator::{
2 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode, CorrelationStrategy,
3};
4use camel_api::body::Body;
5use camel_api::body_converter::BodyType;
6use camel_api::circuit_breaker::CircuitBreakerConfig;
7use camel_api::dynamic_router::{DynamicRouterConfig, RouterExpression};
8use camel_api::error_handler::{ErrorHandlerConfig, RedeliveryPolicy};
9use camel_api::load_balancer::LoadBalancerConfig;
10use camel_api::multicast::{MulticastConfig, MulticastStrategy};
11use camel_api::routing_slip::{RoutingSlipConfig, RoutingSlipExpression};
12use camel_api::splitter::SplitterConfig;
13use camel_api::throttler::{ThrottleStrategy, ThrottlerConfig};
14use camel_api::{
15 BoxProcessor, CamelError, CanonicalRouteSpec, Exchange, FilterPredicate, IdentityProcessor,
16 ProcessorFn, Value,
17 runtime::{
18 CanonicalAggregateSpec, CanonicalAggregateStrategySpec, CanonicalCircuitBreakerSpec,
19 CanonicalSplitAggregationSpec, CanonicalSplitExpressionSpec, CanonicalStepSpec,
20 CanonicalWhenSpec,
21 },
22};
23use camel_component::ConcurrencyModel;
24use camel_core::route::{BuilderStep, DeclarativeWhenStep, RouteDefinition, WhenStep};
25use camel_processor::{
26 ConvertBodyTo, DynamicSetHeader, LogLevel, MapBody, MarshalService, SetBody, SetHeader,
27 UnmarshalService, builtin_data_format,
28};
29
30pub trait StepAccumulator: Sized {
36 fn steps_mut(&mut self) -> &mut Vec<BuilderStep>;
37
38 fn to(mut self, endpoint: impl Into<String>) -> Self {
39 self.steps_mut().push(BuilderStep::To(endpoint.into()));
40 self
41 }
42
43 fn process<F, Fut>(mut self, f: F) -> Self
44 where
45 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
46 Fut: std::future::Future<Output = Result<Exchange, CamelError>> + Send + 'static,
47 {
48 let svc = ProcessorFn::new(f);
49 self.steps_mut()
50 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
51 self
52 }
53
54 fn process_fn(mut self, processor: BoxProcessor) -> Self {
55 self.steps_mut().push(BuilderStep::Processor(processor));
56 self
57 }
58
59 fn set_header(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
60 let svc = SetHeader::new(IdentityProcessor, key, value);
61 self.steps_mut()
62 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
63 self
64 }
65
66 fn map_body<F>(mut self, mapper: F) -> Self
67 where
68 F: Fn(Body) -> Body + Clone + Send + Sync + 'static,
69 {
70 let svc = MapBody::new(IdentityProcessor, mapper);
71 self.steps_mut()
72 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
73 self
74 }
75
76 fn set_body<B>(mut self, body: B) -> Self
77 where
78 B: Into<Body> + Clone + Send + Sync + 'static,
79 {
80 let body: Body = body.into();
81 let svc = SetBody::new(IdentityProcessor, move |_ex: &Exchange| body.clone());
82 self.steps_mut()
83 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
84 self
85 }
86
87 fn transform<B>(self, body: B) -> Self
92 where
93 B: Into<Body> + Clone + Send + Sync + 'static,
94 {
95 self.set_body(body)
96 }
97
98 fn set_body_fn<F>(mut self, expr: F) -> Self
99 where
100 F: Fn(&Exchange) -> Body + Clone + Send + Sync + 'static,
101 {
102 let svc = SetBody::new(IdentityProcessor, expr);
103 self.steps_mut()
104 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
105 self
106 }
107
108 fn set_header_fn<F>(mut self, key: impl Into<String>, expr: F) -> Self
109 where
110 F: Fn(&Exchange) -> Value + Clone + Send + Sync + 'static,
111 {
112 let svc = DynamicSetHeader::new(IdentityProcessor, key, expr);
113 self.steps_mut()
114 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
115 self
116 }
117
118 fn aggregate(mut self, config: AggregatorConfig) -> Self {
119 self.steps_mut().push(BuilderStep::Aggregate { config });
120 self
121 }
122
123 fn stop(mut self) -> Self {
129 self.steps_mut().push(BuilderStep::Stop);
130 self
131 }
132
133 fn log(mut self, message: impl Into<String>, level: LogLevel) -> Self {
137 self.steps_mut().push(BuilderStep::Log {
138 level,
139 message: message.into(),
140 });
141 self
142 }
143
144 fn convert_body_to(mut self, target: BodyType) -> Self {
156 let svc = ConvertBodyTo::new(IdentityProcessor, target);
157 self.steps_mut()
158 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
159 self
160 }
161
162 fn marshal(mut self, format: impl Into<String>) -> Self {
172 let name = format.into();
173 let df =
174 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
175 let svc = MarshalService::new(IdentityProcessor, df);
176 self.steps_mut()
177 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
178 self
179 }
180
181 fn unmarshal(mut self, format: impl Into<String>) -> Self {
191 let name = format.into();
192 let df =
193 builtin_data_format(&name).unwrap_or_else(|| panic!("unknown data format: '{name}'"));
194 let svc = UnmarshalService::new(IdentityProcessor, df);
195 self.steps_mut()
196 .push(BuilderStep::Processor(BoxProcessor::new(svc)));
197 self
198 }
199
200 fn script(mut self, language: impl Into<String>, script: impl Into<String>) -> Self {
211 self.steps_mut().push(BuilderStep::Script {
212 language: language.into(),
213 script: script.into(),
214 });
215 self
216 }
217}
218
219pub struct RouteBuilder {
231 from_uri: String,
232 steps: Vec<BuilderStep>,
233 error_handler: Option<ErrorHandlerConfig>,
234 error_handler_mode: ErrorHandlerMode,
235 circuit_breaker_config: Option<CircuitBreakerConfig>,
236 concurrency: Option<ConcurrencyModel>,
237 route_id: Option<String>,
238 auto_startup: Option<bool>,
239 startup_order: Option<i32>,
240}
241
242#[derive(Default)]
243enum ErrorHandlerMode {
244 #[default]
245 None,
246 ExplicitConfig,
247 Shorthand {
248 dlc_uri: Option<String>,
249 specs: Vec<OnExceptionSpec>,
250 },
251 Mixed,
252}
253
254#[derive(Clone)]
255struct OnExceptionSpec {
256 matches: std::sync::Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
257 retry: Option<RedeliveryPolicy>,
258 handled_by: Option<String>,
259}
260
261impl RouteBuilder {
262 pub fn from(endpoint: &str) -> Self {
264 Self {
265 from_uri: endpoint.to_string(),
266 steps: Vec::new(),
267 error_handler: None,
268 error_handler_mode: ErrorHandlerMode::None,
269 circuit_breaker_config: None,
270 concurrency: None,
271 route_id: None,
272 auto_startup: None,
273 startup_order: None,
274 }
275 }
276
277 pub fn filter<F>(self, predicate: F) -> FilterBuilder
281 where
282 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
283 {
284 FilterBuilder {
285 parent: self,
286 predicate: std::sync::Arc::new(predicate),
287 steps: vec![],
288 }
289 }
290
291 pub fn choice(self) -> ChoiceBuilder {
297 ChoiceBuilder {
298 parent: self,
299 whens: vec![],
300 _otherwise: None,
301 }
302 }
303
304 pub fn wire_tap(mut self, endpoint: &str) -> Self {
308 self.steps.push(BuilderStep::WireTap {
309 uri: endpoint.to_string(),
310 });
311 self
312 }
313
314 pub fn error_handler(mut self, config: ErrorHandlerConfig) -> Self {
316 self.error_handler_mode = match self.error_handler_mode {
317 ErrorHandlerMode::None | ErrorHandlerMode::ExplicitConfig => {
318 ErrorHandlerMode::ExplicitConfig
319 }
320 ErrorHandlerMode::Shorthand { .. } | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
321 };
322 self.error_handler = Some(config);
323 self
324 }
325
326 pub fn dead_letter_channel(mut self, uri: impl Into<String>) -> Self {
328 let uri = uri.into();
329 self.error_handler_mode = match self.error_handler_mode {
330 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
331 dlc_uri: Some(uri),
332 specs: Vec::new(),
333 },
334 ErrorHandlerMode::Shorthand { specs, .. } => ErrorHandlerMode::Shorthand {
335 dlc_uri: Some(uri),
336 specs,
337 },
338 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
339 };
340 self
341 }
342
343 pub fn on_exception<F>(mut self, matches: F) -> OnExceptionBuilder
345 where
346 F: Fn(&CamelError) -> bool + Send + Sync + 'static,
347 {
348 self.error_handler_mode = match self.error_handler_mode {
349 ErrorHandlerMode::None => ErrorHandlerMode::Shorthand {
350 dlc_uri: None,
351 specs: Vec::new(),
352 },
353 ErrorHandlerMode::ExplicitConfig | ErrorHandlerMode::Mixed => ErrorHandlerMode::Mixed,
354 shorthand @ ErrorHandlerMode::Shorthand { .. } => shorthand,
355 };
356
357 OnExceptionBuilder {
358 parent: self,
359 policy: OnExceptionSpec {
360 matches: std::sync::Arc::new(matches),
361 retry: None,
362 handled_by: None,
363 },
364 }
365 }
366
367 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
369 self.circuit_breaker_config = Some(config);
370 self
371 }
372
373 pub fn concurrent(mut self, max: usize) -> Self {
387 let max = if max == 0 { None } else { Some(max) };
388 self.concurrency = Some(ConcurrencyModel::Concurrent { max });
389 self
390 }
391
392 pub fn sequential(mut self) -> Self {
397 self.concurrency = Some(ConcurrencyModel::Sequential);
398 self
399 }
400
401 pub fn route_id(mut self, id: impl Into<String>) -> Self {
405 self.route_id = Some(id.into());
406 self
407 }
408
409 pub fn auto_startup(mut self, auto: bool) -> Self {
413 self.auto_startup = Some(auto);
414 self
415 }
416
417 pub fn startup_order(mut self, order: i32) -> Self {
421 self.startup_order = Some(order);
422 self
423 }
424
425 pub fn split(self, config: SplitterConfig) -> SplitBuilder {
431 SplitBuilder {
432 parent: self,
433 config,
434 steps: Vec::new(),
435 }
436 }
437
438 pub fn multicast(self) -> MulticastBuilder {
444 MulticastBuilder {
445 parent: self,
446 steps: Vec::new(),
447 config: MulticastConfig::new(),
448 }
449 }
450
451 pub fn throttle(self, max_requests: usize, period: std::time::Duration) -> ThrottleBuilder {
458 ThrottleBuilder {
459 parent: self,
460 config: ThrottlerConfig::new(max_requests, period),
461 steps: Vec::new(),
462 }
463 }
464
465 pub fn load_balance(self) -> LoadBalancerBuilder {
471 LoadBalancerBuilder {
472 parent: self,
473 config: LoadBalancerConfig::round_robin(),
474 steps: Vec::new(),
475 }
476 }
477
478 pub fn dynamic_router(self, expression: RouterExpression) -> Self {
494 self.dynamic_router_with_config(DynamicRouterConfig::new(expression))
495 }
496
497 pub fn dynamic_router_with_config(mut self, config: DynamicRouterConfig) -> Self {
501 self.steps.push(BuilderStep::DynamicRouter { config });
502 self
503 }
504
505 pub fn routing_slip(self, expression: RoutingSlipExpression) -> Self {
506 self.routing_slip_with_config(RoutingSlipConfig::new(expression))
507 }
508
509 pub fn routing_slip_with_config(mut self, config: RoutingSlipConfig) -> Self {
510 self.steps.push(BuilderStep::RoutingSlip { config });
511 self
512 }
513
514 pub fn build(self) -> Result<RouteDefinition, CamelError> {
516 if self.from_uri.is_empty() {
517 return Err(CamelError::RouteError(
518 "route must have a 'from' URI".to_string(),
519 ));
520 }
521 let route_id = self.route_id.ok_or_else(|| {
522 CamelError::RouteError(
523 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
524 .to_string(),
525 )
526 })?;
527 let resolved_error_handler = match self.error_handler_mode {
528 ErrorHandlerMode::None => self.error_handler,
529 ErrorHandlerMode::ExplicitConfig => self.error_handler,
530 ErrorHandlerMode::Mixed => {
531 return Err(CamelError::RouteError(
532 "mixed error handler modes: cannot combine .error_handler(config) with shorthand methods".into(),
533 ));
534 }
535 ErrorHandlerMode::Shorthand { dlc_uri, specs } => {
536 let mut config = if let Some(uri) = dlc_uri {
537 ErrorHandlerConfig::dead_letter_channel(uri)
538 } else {
539 ErrorHandlerConfig::log_only()
540 };
541
542 for spec in specs {
543 let matcher = spec.matches.clone();
544 let mut builder = config.on_exception(move |e| matcher(e));
545
546 if let Some(retry) = spec.retry {
547 builder = builder.retry(retry.max_attempts).with_backoff(
548 retry.initial_delay,
549 retry.multiplier,
550 retry.max_delay,
551 );
552 if retry.jitter_factor > 0.0 {
553 builder = builder.with_jitter(retry.jitter_factor);
554 }
555 }
556
557 if let Some(uri) = spec.handled_by {
558 builder = builder.handled_by(uri);
559 }
560
561 config = builder.build();
562 }
563
564 Some(config)
565 }
566 };
567
568 let definition = RouteDefinition::new(self.from_uri, self.steps);
569 let definition = if let Some(eh) = resolved_error_handler {
570 definition.with_error_handler(eh)
571 } else {
572 definition
573 };
574 let definition = if let Some(cb) = self.circuit_breaker_config {
575 definition.with_circuit_breaker(cb)
576 } else {
577 definition
578 };
579 let definition = if let Some(concurrency) = self.concurrency {
580 definition.with_concurrency(concurrency)
581 } else {
582 definition
583 };
584 let definition = definition.with_route_id(route_id);
585 let definition = if let Some(auto) = self.auto_startup {
586 definition.with_auto_startup(auto)
587 } else {
588 definition
589 };
590 let definition = if let Some(order) = self.startup_order {
591 definition.with_startup_order(order)
592 } else {
593 definition
594 };
595 Ok(definition)
596 }
597
598 pub fn build_canonical(self) -> Result<CanonicalRouteSpec, CamelError> {
600 if self.from_uri.is_empty() {
601 return Err(CamelError::RouteError(
602 "route must have a 'from' URI".to_string(),
603 ));
604 }
605 let route_id = self.route_id.ok_or_else(|| {
606 CamelError::RouteError(
607 "route must have a 'route_id' — call .route_id(\"name\") on the builder"
608 .to_string(),
609 )
610 })?;
611
612 let steps = canonicalize_steps(self.steps)?;
613 let circuit_breaker = self
614 .circuit_breaker_config
615 .map(canonicalize_circuit_breaker);
616
617 let spec = CanonicalRouteSpec {
618 route_id,
619 from: self.from_uri,
620 steps,
621 circuit_breaker,
622 version: camel_api::CANONICAL_CONTRACT_VERSION,
623 };
624 spec.validate_contract()?;
625 Ok(spec)
626 }
627}
628
629pub struct OnExceptionBuilder {
630 parent: RouteBuilder,
631 policy: OnExceptionSpec,
632}
633
634impl OnExceptionBuilder {
635 pub fn retry(mut self, max_attempts: u32) -> Self {
636 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
637 self
638 }
639
640 pub fn with_backoff(
641 mut self,
642 initial: std::time::Duration,
643 multiplier: f64,
644 max: std::time::Duration,
645 ) -> Self {
646 if let Some(ref mut retry) = self.policy.retry {
647 retry.initial_delay = initial;
648 retry.multiplier = multiplier;
649 retry.max_delay = max;
650 }
651 self
652 }
653
654 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
655 if let Some(ref mut retry) = self.policy.retry {
656 retry.jitter_factor = jitter_factor.clamp(0.0, 1.0);
657 }
658 self
659 }
660
661 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
662 self.policy.handled_by = Some(uri.into());
663 self
664 }
665
666 pub fn end_on_exception(mut self) -> RouteBuilder {
667 if let ErrorHandlerMode::Shorthand { ref mut specs, .. } = self.parent.error_handler_mode {
668 specs.push(self.policy);
669 }
670 self.parent
671 }
672}
673
674fn canonicalize_steps(steps: Vec<BuilderStep>) -> Result<Vec<CanonicalStepSpec>, CamelError> {
675 let mut canonical = Vec::with_capacity(steps.len());
676 for step in steps {
677 canonical.push(canonicalize_step(step)?);
678 }
679 Ok(canonical)
680}
681
682fn canonicalize_step(step: BuilderStep) -> Result<CanonicalStepSpec, CamelError> {
683 match step {
684 BuilderStep::To(uri) => Ok(CanonicalStepSpec::To { uri }),
685 BuilderStep::Log { message, .. } => Ok(CanonicalStepSpec::Log { message }),
686 BuilderStep::Stop => Ok(CanonicalStepSpec::Stop),
687 BuilderStep::WireTap { uri } => Ok(CanonicalStepSpec::WireTap { uri }),
688 BuilderStep::DeclarativeScript { expression } => {
689 Ok(CanonicalStepSpec::Script { expression })
690 }
691 BuilderStep::DeclarativeFilter { predicate, steps } => Ok(CanonicalStepSpec::Filter {
692 predicate,
693 steps: canonicalize_steps(steps)?,
694 }),
695 BuilderStep::DeclarativeChoice { whens, otherwise } => {
696 let mut canonical_whens = Vec::with_capacity(whens.len());
697 for DeclarativeWhenStep { predicate, steps } in whens {
698 canonical_whens.push(CanonicalWhenSpec {
699 predicate,
700 steps: canonicalize_steps(steps)?,
701 });
702 }
703 let otherwise = match otherwise {
704 Some(steps) => Some(canonicalize_steps(steps)?),
705 None => None,
706 };
707 Ok(CanonicalStepSpec::Choice {
708 whens: canonical_whens,
709 otherwise,
710 })
711 }
712 BuilderStep::DeclarativeSplit {
713 expression,
714 aggregation,
715 parallel,
716 parallel_limit,
717 stop_on_exception,
718 steps,
719 } => Ok(CanonicalStepSpec::Split {
720 expression: CanonicalSplitExpressionSpec::Language(expression),
721 aggregation: canonicalize_split_aggregation(aggregation)?,
722 parallel,
723 parallel_limit,
724 stop_on_exception,
725 steps: canonicalize_steps(steps)?,
726 }),
727 BuilderStep::Aggregate { config } => Ok(CanonicalStepSpec::Aggregate {
728 config: canonicalize_aggregate(config)?,
729 }),
730 other => {
731 let step_name = canonical_step_name(&other);
732 let detail = camel_api::canonical_contract_rejection_reason(step_name)
733 .unwrap_or("not included in canonical v1");
734 Err(CamelError::RouteError(format!(
735 "canonical v1 does not support step `{step_name}`: {detail}"
736 )))
737 }
738 }
739}
740
741fn canonicalize_split_aggregation(
742 strategy: camel_api::splitter::AggregationStrategy,
743) -> Result<CanonicalSplitAggregationSpec, CamelError> {
744 match strategy {
745 camel_api::splitter::AggregationStrategy::LastWins => {
746 Ok(CanonicalSplitAggregationSpec::LastWins)
747 }
748 camel_api::splitter::AggregationStrategy::CollectAll => {
749 Ok(CanonicalSplitAggregationSpec::CollectAll)
750 }
751 camel_api::splitter::AggregationStrategy::Custom(_) => Err(CamelError::RouteError(
752 "canonical v1 does not support custom split aggregation".to_string(),
753 )),
754 camel_api::splitter::AggregationStrategy::Original => {
755 Ok(CanonicalSplitAggregationSpec::Original)
756 }
757 }
758}
759
760fn extract_completion_fields(
761 mode: &CompletionMode,
762) -> Result<(Option<usize>, Option<u64>), CamelError> {
763 match mode {
764 CompletionMode::Single(cond) => match cond {
765 CompletionCondition::Size(n) => Ok((Some(*n), None)),
766 CompletionCondition::Timeout(d) => Ok((None, Some(d.as_millis() as u64))),
767 CompletionCondition::Predicate(_) => Err(CamelError::RouteError(
768 "canonical v1 does not support aggregate predicate completion".to_string(),
769 )),
770 },
771 CompletionMode::Any(conds) => {
772 let mut size = None;
773 let mut timeout_ms = None;
774 for cond in conds {
775 match cond {
776 CompletionCondition::Size(n) => size = Some(*n),
777 CompletionCondition::Timeout(d) => timeout_ms = Some(d.as_millis() as u64),
778 CompletionCondition::Predicate(_) => {
779 return Err(CamelError::RouteError(
780 "canonical v1 does not support aggregate predicate completion"
781 .to_string(),
782 ));
783 }
784 }
785 }
786 Ok((size, timeout_ms))
787 }
788 }
789}
790
791fn canonicalize_aggregate(config: AggregatorConfig) -> Result<CanonicalAggregateSpec, CamelError> {
792 let (completion_size, completion_timeout_ms) = extract_completion_fields(&config.completion)?;
793
794 let header = match &config.correlation {
795 CorrelationStrategy::HeaderName(h) => h.clone(),
796 CorrelationStrategy::Expression { expr, .. } => expr.clone(),
797 CorrelationStrategy::Fn(_) => {
798 return Err(CamelError::RouteError(
799 "canonical v1 does not support Fn correlation strategy".to_string(),
800 ));
801 }
802 };
803
804 let correlation_key = match &config.correlation {
805 CorrelationStrategy::HeaderName(_) => None,
806 CorrelationStrategy::Expression { expr, .. } => Some(expr.clone()),
807 CorrelationStrategy::Fn(_) => unreachable!(),
808 };
809
810 let strategy = match config.strategy {
811 AggregationStrategy::CollectAll => CanonicalAggregateStrategySpec::CollectAll,
812 AggregationStrategy::Custom(_) => {
813 return Err(CamelError::RouteError(
814 "canonical v1 does not support custom aggregate strategy".to_string(),
815 ));
816 }
817 };
818 let bucket_ttl_ms = config
819 .bucket_ttl
820 .map(|ttl| u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX));
821
822 Ok(CanonicalAggregateSpec {
823 header,
824 completion_size,
825 completion_timeout_ms,
826 correlation_key,
827 force_completion_on_stop: if config.force_completion_on_stop {
828 Some(true)
829 } else {
830 None
831 },
832 discard_on_timeout: if config.discard_on_timeout {
833 Some(true)
834 } else {
835 None
836 },
837 strategy,
838 max_buckets: config.max_buckets,
839 bucket_ttl_ms,
840 })
841}
842
843fn canonicalize_circuit_breaker(config: CircuitBreakerConfig) -> CanonicalCircuitBreakerSpec {
844 CanonicalCircuitBreakerSpec {
845 failure_threshold: config.failure_threshold,
846 open_duration_ms: u64::try_from(config.open_duration.as_millis()).unwrap_or(u64::MAX),
847 }
848}
849
850fn canonical_step_name(step: &BuilderStep) -> &'static str {
851 match step {
852 BuilderStep::Processor(_) => "processor",
853 BuilderStep::To(_) => "to",
854 BuilderStep::Stop => "stop",
855 BuilderStep::Log { .. } => "log",
856 BuilderStep::DeclarativeSetHeader { .. } => "set_header",
857 BuilderStep::DeclarativeSetBody { .. } => "set_body",
858 BuilderStep::DeclarativeFilter { .. } => "filter",
859 BuilderStep::DeclarativeChoice { .. } => "choice",
860 BuilderStep::DeclarativeScript { .. } => "script",
861 BuilderStep::DeclarativeSplit { .. } => "split",
862 BuilderStep::Split { .. } => "split",
863 BuilderStep::Aggregate { .. } => "aggregate",
864 BuilderStep::Filter { .. } => "filter",
865 BuilderStep::Choice { .. } => "choice",
866 BuilderStep::WireTap { .. } => "wire_tap",
867 BuilderStep::Multicast { .. } => "multicast",
868 BuilderStep::DeclarativeLog { .. } => "log",
869 BuilderStep::Bean { .. } => "bean",
870 BuilderStep::Script { .. } => "script",
871 BuilderStep::Throttle { .. } => "throttle",
872 BuilderStep::LoadBalance { .. } => "load_balancer",
873 BuilderStep::DynamicRouter { .. } => "dynamic_router",
874 BuilderStep::RoutingSlip { .. } => "routing_slip",
875 }
876}
877
878impl StepAccumulator for RouteBuilder {
879 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
880 &mut self.steps
881 }
882}
883
884pub struct SplitBuilder {
892 parent: RouteBuilder,
893 config: SplitterConfig,
894 steps: Vec<BuilderStep>,
895}
896
897impl SplitBuilder {
898 pub fn filter<F>(self, predicate: F) -> FilterInSplitBuilder
900 where
901 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
902 {
903 FilterInSplitBuilder {
904 parent: self,
905 predicate: std::sync::Arc::new(predicate),
906 steps: vec![],
907 }
908 }
909
910 pub fn end_split(mut self) -> RouteBuilder {
913 let split_step = BuilderStep::Split {
914 config: self.config,
915 steps: self.steps,
916 };
917 self.parent.steps.push(split_step);
918 self.parent
919 }
920}
921
922impl StepAccumulator for SplitBuilder {
923 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
924 &mut self.steps
925 }
926}
927
928pub struct FilterBuilder {
930 parent: RouteBuilder,
931 predicate: FilterPredicate,
932 steps: Vec<BuilderStep>,
933}
934
935impl FilterBuilder {
936 pub fn end_filter(mut self) -> RouteBuilder {
939 let step = BuilderStep::Filter {
940 predicate: self.predicate,
941 steps: self.steps,
942 };
943 self.parent.steps.push(step);
944 self.parent
945 }
946}
947
948impl StepAccumulator for FilterBuilder {
949 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
950 &mut self.steps
951 }
952}
953
954pub struct FilterInSplitBuilder {
956 parent: SplitBuilder,
957 predicate: FilterPredicate,
958 steps: Vec<BuilderStep>,
959}
960
961impl FilterInSplitBuilder {
962 pub fn end_filter(mut self) -> SplitBuilder {
964 let step = BuilderStep::Filter {
965 predicate: self.predicate,
966 steps: self.steps,
967 };
968 self.parent.steps.push(step);
969 self.parent
970 }
971}
972
973impl StepAccumulator for FilterInSplitBuilder {
974 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
975 &mut self.steps
976 }
977}
978
979pub struct ChoiceBuilder {
986 parent: RouteBuilder,
987 whens: Vec<WhenStep>,
988 _otherwise: Option<Vec<BuilderStep>>,
989}
990
991impl ChoiceBuilder {
992 pub fn when<F>(self, predicate: F) -> WhenBuilder
995 where
996 F: Fn(&Exchange) -> bool + Send + Sync + 'static,
997 {
998 WhenBuilder {
999 parent: self,
1000 predicate: std::sync::Arc::new(predicate),
1001 steps: vec![],
1002 }
1003 }
1004
1005 pub fn otherwise(self) -> OtherwiseBuilder {
1009 OtherwiseBuilder {
1010 parent: self,
1011 steps: vec![],
1012 }
1013 }
1014
1015 pub fn end_choice(mut self) -> RouteBuilder {
1019 let step = BuilderStep::Choice {
1020 whens: self.whens,
1021 otherwise: self._otherwise,
1022 };
1023 self.parent.steps.push(step);
1024 self.parent
1025 }
1026}
1027
1028pub struct WhenBuilder {
1030 parent: ChoiceBuilder,
1031 predicate: camel_api::FilterPredicate,
1032 steps: Vec<BuilderStep>,
1033}
1034
1035impl WhenBuilder {
1036 pub fn end_when(mut self) -> ChoiceBuilder {
1039 self.parent.whens.push(WhenStep {
1040 predicate: self.predicate,
1041 steps: self.steps,
1042 });
1043 self.parent
1044 }
1045}
1046
1047impl StepAccumulator for WhenBuilder {
1048 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1049 &mut self.steps
1050 }
1051}
1052
1053pub struct OtherwiseBuilder {
1055 parent: ChoiceBuilder,
1056 steps: Vec<BuilderStep>,
1057}
1058
1059impl OtherwiseBuilder {
1060 pub fn end_otherwise(self) -> ChoiceBuilder {
1062 let OtherwiseBuilder { mut parent, steps } = self;
1063 parent._otherwise = Some(steps);
1064 parent
1065 }
1066}
1067
1068impl StepAccumulator for OtherwiseBuilder {
1069 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1070 &mut self.steps
1071 }
1072}
1073
1074pub struct MulticastBuilder {
1082 parent: RouteBuilder,
1083 steps: Vec<BuilderStep>,
1084 config: MulticastConfig,
1085}
1086
1087impl MulticastBuilder {
1088 pub fn parallel(mut self, parallel: bool) -> Self {
1089 self.config = self.config.parallel(parallel);
1090 self
1091 }
1092
1093 pub fn parallel_limit(mut self, limit: usize) -> Self {
1094 self.config = self.config.parallel_limit(limit);
1095 self
1096 }
1097
1098 pub fn stop_on_exception(mut self, stop: bool) -> Self {
1099 self.config = self.config.stop_on_exception(stop);
1100 self
1101 }
1102
1103 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
1104 self.config = self.config.timeout(duration);
1105 self
1106 }
1107
1108 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
1109 self.config = self.config.aggregation(strategy);
1110 self
1111 }
1112
1113 pub fn end_multicast(mut self) -> RouteBuilder {
1114 let step = BuilderStep::Multicast {
1115 steps: self.steps,
1116 config: self.config,
1117 };
1118 self.parent.steps.push(step);
1119 self.parent
1120 }
1121}
1122
1123impl StepAccumulator for MulticastBuilder {
1124 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1125 &mut self.steps
1126 }
1127}
1128
1129pub struct ThrottleBuilder {
1137 parent: RouteBuilder,
1138 config: ThrottlerConfig,
1139 steps: Vec<BuilderStep>,
1140}
1141
1142impl ThrottleBuilder {
1143 pub fn strategy(mut self, strategy: ThrottleStrategy) -> Self {
1149 self.config = self.config.strategy(strategy);
1150 self
1151 }
1152
1153 pub fn end_throttle(mut self) -> RouteBuilder {
1156 let step = BuilderStep::Throttle {
1157 config: self.config,
1158 steps: self.steps,
1159 };
1160 self.parent.steps.push(step);
1161 self.parent
1162 }
1163}
1164
1165impl StepAccumulator for ThrottleBuilder {
1166 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1167 &mut self.steps
1168 }
1169}
1170
1171pub struct LoadBalancerBuilder {
1179 parent: RouteBuilder,
1180 config: LoadBalancerConfig,
1181 steps: Vec<BuilderStep>,
1182}
1183
1184impl LoadBalancerBuilder {
1185 pub fn round_robin(mut self) -> Self {
1187 self.config = LoadBalancerConfig::round_robin();
1188 self
1189 }
1190
1191 pub fn random(mut self) -> Self {
1193 self.config = LoadBalancerConfig::random();
1194 self
1195 }
1196
1197 pub fn weighted(mut self, weights: Vec<(String, u32)>) -> Self {
1202 self.config = LoadBalancerConfig::weighted(weights);
1203 self
1204 }
1205
1206 pub fn failover(mut self) -> Self {
1211 self.config = LoadBalancerConfig::failover();
1212 self
1213 }
1214
1215 pub fn parallel(mut self, parallel: bool) -> Self {
1220 self.config = self.config.parallel(parallel);
1221 self
1222 }
1223
1224 pub fn end_load_balance(mut self) -> RouteBuilder {
1227 let step = BuilderStep::LoadBalance {
1228 config: self.config,
1229 steps: self.steps,
1230 };
1231 self.parent.steps.push(step);
1232 self.parent
1233 }
1234}
1235
1236impl StepAccumulator for LoadBalancerBuilder {
1237 fn steps_mut(&mut self) -> &mut Vec<BuilderStep> {
1238 &mut self.steps
1239 }
1240}
1241
1242#[cfg(test)]
1247mod tests {
1248 use super::*;
1249 use camel_api::error_handler::ErrorHandlerConfig;
1250 use camel_api::load_balancer::LoadBalanceStrategy;
1251 use camel_api::{Exchange, Message};
1252 use camel_core::route::BuilderStep;
1253 use std::sync::Arc;
1254 use std::time::Duration;
1255 use tower::{Service, ServiceExt};
1256
1257 #[test]
1258 fn test_builder_from_creates_definition() {
1259 let definition = RouteBuilder::from("timer:tick")
1260 .route_id("test-route")
1261 .build()
1262 .unwrap();
1263 assert_eq!(definition.from_uri(), "timer:tick");
1264 }
1265
1266 #[test]
1267 fn test_builder_empty_from_uri_errors() {
1268 let result = RouteBuilder::from("").route_id("test-route").build();
1269 assert!(result.is_err());
1270 }
1271
1272 #[test]
1273 fn test_builder_to_adds_step() {
1274 let definition = RouteBuilder::from("timer:tick")
1275 .route_id("test-route")
1276 .to("log:info")
1277 .build()
1278 .unwrap();
1279
1280 assert_eq!(definition.from_uri(), "timer:tick");
1281 assert!(matches!(&definition.steps()[0], BuilderStep::To(uri) if uri == "log:info"));
1283 }
1284
1285 #[test]
1286 fn test_builder_filter_adds_filter_step() {
1287 let definition = RouteBuilder::from("timer:tick")
1288 .route_id("test-route")
1289 .filter(|_ex| true)
1290 .to("mock:result")
1291 .end_filter()
1292 .build()
1293 .unwrap();
1294
1295 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1296 }
1297
1298 #[test]
1299 fn test_builder_set_header_adds_processor_step() {
1300 let definition = RouteBuilder::from("timer:tick")
1301 .route_id("test-route")
1302 .set_header("key", Value::String("value".into()))
1303 .build()
1304 .unwrap();
1305
1306 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1307 }
1308
1309 #[test]
1310 fn test_builder_map_body_adds_processor_step() {
1311 let definition = RouteBuilder::from("timer:tick")
1312 .route_id("test-route")
1313 .map_body(|body| body)
1314 .build()
1315 .unwrap();
1316
1317 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1318 }
1319
1320 #[test]
1321 fn test_builder_process_adds_processor_step() {
1322 let definition = RouteBuilder::from("timer:tick")
1323 .route_id("test-route")
1324 .process(|ex| async move { Ok(ex) })
1325 .build()
1326 .unwrap();
1327
1328 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1329 }
1330
1331 #[test]
1332 fn test_builder_chain_multiple_steps() {
1333 let definition = RouteBuilder::from("timer:tick")
1334 .route_id("test-route")
1335 .set_header("source", Value::String("timer".into()))
1336 .filter(|ex| ex.input.header("source").is_some())
1337 .to("log:info")
1338 .end_filter()
1339 .to("mock:result")
1340 .build()
1341 .unwrap();
1342
1343 assert_eq!(definition.steps().len(), 3); assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_))); assert!(matches!(&definition.steps()[1], BuilderStep::Filter { .. })); assert!(matches!(&definition.steps()[2], BuilderStep::To(uri) if uri == "mock:result"));
1347 }
1348
1349 #[tokio::test]
1354 async fn test_set_header_processor_works() {
1355 let mut svc = SetHeader::new(IdentityProcessor, "greeting", Value::String("hello".into()));
1356 let exchange = Exchange::new(Message::new("test"));
1357 let result = svc.call(exchange).await.unwrap();
1358 assert_eq!(
1359 result.input.header("greeting"),
1360 Some(&Value::String("hello".into()))
1361 );
1362 }
1363
1364 #[tokio::test]
1365 async fn test_filter_processor_passes() {
1366 use camel_api::BoxProcessorExt;
1367 use camel_processor::FilterService;
1368
1369 let sub = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1370 let mut svc =
1371 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1372 let exchange = Exchange::new(Message::new("pass"));
1373 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1374 assert_eq!(result.input.body.as_text(), Some("pass"));
1375 }
1376
1377 #[tokio::test]
1378 async fn test_filter_processor_blocks() {
1379 use camel_api::BoxProcessorExt;
1380 use camel_processor::FilterService;
1381
1382 let sub = BoxProcessor::from_fn(|_ex| {
1383 Box::pin(async move { Err(CamelError::ProcessorError("should not reach".into())) })
1384 });
1385 let mut svc =
1386 FilterService::new(|ex: &Exchange| ex.input.body.as_text() == Some("pass"), sub);
1387 let exchange = Exchange::new(Message::new("reject"));
1388 let result = svc.ready().await.unwrap().call(exchange).await.unwrap();
1389 assert_eq!(result.input.body.as_text(), Some("reject"));
1390 }
1391
1392 #[tokio::test]
1393 async fn test_map_body_processor_works() {
1394 let mapper = MapBody::new(IdentityProcessor, |body: Body| {
1395 if let Some(text) = body.as_text() {
1396 Body::Text(text.to_uppercase())
1397 } else {
1398 body
1399 }
1400 });
1401 let exchange = Exchange::new(Message::new("hello"));
1402 let result = mapper.oneshot(exchange).await.unwrap();
1403 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1404 }
1405
1406 #[tokio::test]
1407 async fn test_process_custom_processor_works() {
1408 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
1409 ex.set_property("custom", Value::Bool(true));
1410 Ok(ex)
1411 });
1412 let exchange = Exchange::new(Message::default());
1413 let result = processor.oneshot(exchange).await.unwrap();
1414 assert_eq!(result.property("custom"), Some(&Value::Bool(true)));
1415 }
1416
1417 #[tokio::test]
1422 async fn test_compose_pipeline_runs_steps_in_order() {
1423 use camel_core::route::compose_pipeline;
1424
1425 let processors = vec![
1426 BoxProcessor::new(SetHeader::new(
1427 IdentityProcessor,
1428 "step",
1429 Value::String("one".into()),
1430 )),
1431 BoxProcessor::new(MapBody::new(IdentityProcessor, |body: Body| {
1432 if let Some(text) = body.as_text() {
1433 Body::Text(format!("{}-processed", text))
1434 } else {
1435 body
1436 }
1437 })),
1438 ];
1439
1440 let pipeline = compose_pipeline(processors);
1441 let exchange = Exchange::new(Message::new("hello"));
1442 let result = pipeline.oneshot(exchange).await.unwrap();
1443
1444 assert_eq!(
1445 result.input.header("step"),
1446 Some(&Value::String("one".into()))
1447 );
1448 assert_eq!(result.input.body.as_text(), Some("hello-processed"));
1449 }
1450
1451 #[tokio::test]
1452 async fn test_compose_pipeline_empty_is_identity() {
1453 use camel_core::route::compose_pipeline;
1454
1455 let pipeline = compose_pipeline(vec![]);
1456 let exchange = Exchange::new(Message::new("unchanged"));
1457 let result = pipeline.oneshot(exchange).await.unwrap();
1458 assert_eq!(result.input.body.as_text(), Some("unchanged"));
1459 }
1460
1461 #[test]
1466 fn test_builder_circuit_breaker_sets_config() {
1467 use camel_api::circuit_breaker::CircuitBreakerConfig;
1468
1469 let config = CircuitBreakerConfig::new().failure_threshold(5);
1470 let definition = RouteBuilder::from("timer:tick")
1471 .route_id("test-route")
1472 .circuit_breaker(config)
1473 .build()
1474 .unwrap();
1475
1476 let cb = definition
1477 .circuit_breaker_config()
1478 .expect("circuit breaker should be set");
1479 assert_eq!(cb.failure_threshold, 5);
1480 }
1481
1482 #[test]
1483 fn test_builder_circuit_breaker_with_error_handler() {
1484 use camel_api::circuit_breaker::CircuitBreakerConfig;
1485 use camel_api::error_handler::ErrorHandlerConfig;
1486
1487 let cb_config = CircuitBreakerConfig::new().failure_threshold(3);
1488 let eh_config = ErrorHandlerConfig::log_only();
1489
1490 let definition = RouteBuilder::from("timer:tick")
1491 .route_id("test-route")
1492 .to("log:info")
1493 .circuit_breaker(cb_config)
1494 .error_handler(eh_config)
1495 .build()
1496 .unwrap();
1497
1498 assert!(
1499 definition.circuit_breaker_config().is_some(),
1500 "circuit breaker config should be set"
1501 );
1502 }
1504
1505 #[test]
1506 fn test_builder_on_exception_shorthand_multiple_clauses_preserve_order() {
1507 let definition = RouteBuilder::from("direct:start")
1508 .route_id("test-route")
1509 .dead_letter_channel("log:dlc")
1510 .on_exception(|e| matches!(e, CamelError::Io(_)))
1511 .retry(3)
1512 .handled_by("log:io")
1513 .end_on_exception()
1514 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1515 .retry(1)
1516 .end_on_exception()
1517 .to("mock:out")
1518 .build()
1519 .expect("route should build");
1520
1521 let cfg = definition
1522 .error_handler_config()
1523 .expect("error handler should be set");
1524 assert_eq!(cfg.policies.len(), 2);
1525 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1526 assert_eq!(
1527 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1528 Some(3)
1529 );
1530 assert_eq!(cfg.policies[0].handled_by.as_deref(), Some("log:io"));
1531 assert_eq!(
1532 cfg.policies[1].retry.as_ref().map(|p| p.max_attempts),
1533 Some(1)
1534 );
1535 }
1536
1537 #[test]
1538 fn test_builder_on_exception_mixed_mode_rejected() {
1539 let result = RouteBuilder::from("direct:start")
1540 .route_id("test-route")
1541 .error_handler(ErrorHandlerConfig::log_only())
1542 .on_exception(|_e| true)
1543 .end_on_exception()
1544 .to("mock:out")
1545 .build();
1546
1547 let err = result.err().expect("mixed mode should fail with an error");
1548
1549 assert!(
1550 format!("{err}").contains("mixed error handler modes"),
1551 "unexpected error: {err}"
1552 );
1553 }
1554
1555 #[test]
1556 fn test_builder_on_exception_backoff_and_jitter_without_retry_noop() {
1557 let definition = RouteBuilder::from("direct:start")
1558 .route_id("test-route")
1559 .on_exception(|_e| true)
1560 .with_backoff(Duration::from_millis(5), 3.0, Duration::from_millis(100))
1561 .with_jitter(0.5)
1562 .end_on_exception()
1563 .to("mock:out")
1564 .build()
1565 .expect("route should build");
1566
1567 let cfg = definition
1568 .error_handler_config()
1569 .expect("error handler should be set");
1570 assert_eq!(cfg.policies.len(), 1);
1571 assert!(cfg.policies[0].retry.is_none());
1572 }
1573
1574 #[test]
1575 fn test_builder_dead_letter_channel_without_on_exception_sets_dlc() {
1576 let definition = RouteBuilder::from("direct:start")
1577 .route_id("test-route")
1578 .dead_letter_channel("log:dlc")
1579 .to("mock:out")
1580 .build()
1581 .expect("route should build");
1582
1583 let cfg = definition
1584 .error_handler_config()
1585 .expect("error handler should be set");
1586 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:dlc"));
1587 assert!(cfg.policies.is_empty());
1588 }
1589
1590 #[test]
1591 fn test_builder_dead_letter_channel_called_twice_uses_latest_and_keeps_policies() {
1592 let definition = RouteBuilder::from("direct:start")
1593 .route_id("test-route")
1594 .dead_letter_channel("log:first")
1595 .on_exception(|e| matches!(e, CamelError::Io(_)))
1596 .retry(2)
1597 .end_on_exception()
1598 .dead_letter_channel("log:second")
1599 .to("mock:out")
1600 .build()
1601 .expect("route should build");
1602
1603 let cfg = definition
1604 .error_handler_config()
1605 .expect("error handler should be set");
1606 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1607 assert_eq!(cfg.policies.len(), 1);
1608 assert_eq!(
1609 cfg.policies[0].retry.as_ref().map(|p| p.max_attempts),
1610 Some(2)
1611 );
1612 }
1613
1614 #[test]
1615 fn test_builder_on_exception_without_dlc_defaults_to_log_only() {
1616 let definition = RouteBuilder::from("direct:start")
1617 .route_id("test-route")
1618 .on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
1619 .retry(1)
1620 .end_on_exception()
1621 .to("mock:out")
1622 .build()
1623 .expect("route should build");
1624
1625 let cfg = definition
1626 .error_handler_config()
1627 .expect("error handler should be set");
1628 assert!(cfg.dlc_uri.is_none());
1629 assert_eq!(cfg.policies.len(), 1);
1630 }
1631
1632 #[test]
1633 fn test_builder_error_handler_explicit_overwrite_stays_explicit_mode() {
1634 let first = ErrorHandlerConfig::dead_letter_channel("log:first");
1635 let second = ErrorHandlerConfig::dead_letter_channel("log:second");
1636
1637 let definition = RouteBuilder::from("direct:start")
1638 .route_id("test-route")
1639 .error_handler(first)
1640 .error_handler(second)
1641 .to("mock:out")
1642 .build()
1643 .expect("route should build");
1644
1645 let cfg = definition
1646 .error_handler_config()
1647 .expect("error handler should be set");
1648 assert_eq!(cfg.dlc_uri.as_deref(), Some("log:second"));
1649 }
1650
1651 #[test]
1654 fn test_split_builder_typestate() {
1655 use camel_api::splitter::{SplitterConfig, split_body_lines};
1656
1657 let definition = RouteBuilder::from("timer:test?period=1000")
1659 .route_id("test-route")
1660 .split(SplitterConfig::new(split_body_lines()))
1661 .to("mock:per-fragment")
1662 .end_split()
1663 .to("mock:final")
1664 .build()
1665 .unwrap();
1666
1667 assert_eq!(definition.steps().len(), 2);
1669 }
1670
1671 #[test]
1672 fn test_split_builder_steps_collected() {
1673 use camel_api::splitter::{SplitterConfig, split_body_lines};
1674
1675 let definition = RouteBuilder::from("timer:test?period=1000")
1676 .route_id("test-route")
1677 .split(SplitterConfig::new(split_body_lines()))
1678 .set_header("fragment", Value::String("yes".into()))
1679 .to("mock:per-fragment")
1680 .end_split()
1681 .build()
1682 .unwrap();
1683
1684 assert_eq!(definition.steps().len(), 1);
1686 match &definition.steps()[0] {
1687 BuilderStep::Split { steps, .. } => {
1688 assert_eq!(steps.len(), 2); }
1690 other => panic!("Expected Split, got {:?}", other),
1691 }
1692 }
1693
1694 #[test]
1695 fn test_split_builder_config_propagated() {
1696 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
1697
1698 let definition = RouteBuilder::from("timer:test?period=1000")
1699 .route_id("test-route")
1700 .split(
1701 SplitterConfig::new(split_body_lines())
1702 .parallel(true)
1703 .parallel_limit(4)
1704 .aggregation(AggregationStrategy::CollectAll),
1705 )
1706 .to("mock:per-fragment")
1707 .end_split()
1708 .build()
1709 .unwrap();
1710
1711 match &definition.steps()[0] {
1712 BuilderStep::Split { config, .. } => {
1713 assert!(config.parallel);
1714 assert_eq!(config.parallel_limit, Some(4));
1715 assert!(matches!(
1716 config.aggregation,
1717 AggregationStrategy::CollectAll
1718 ));
1719 }
1720 other => panic!("Expected Split, got {:?}", other),
1721 }
1722 }
1723
1724 #[test]
1725 fn test_aggregate_builder_adds_step() {
1726 use camel_api::aggregator::AggregatorConfig;
1727 use camel_core::route::BuilderStep;
1728
1729 let definition = RouteBuilder::from("timer:tick")
1730 .route_id("test-route")
1731 .aggregate(
1732 AggregatorConfig::correlate_by("key")
1733 .complete_when_size(2)
1734 .build(),
1735 )
1736 .build()
1737 .unwrap();
1738
1739 assert_eq!(definition.steps().len(), 1);
1740 assert!(matches!(
1741 definition.steps()[0],
1742 BuilderStep::Aggregate { .. }
1743 ));
1744 }
1745
1746 #[test]
1747 fn test_aggregate_in_split_builder() {
1748 use camel_api::aggregator::AggregatorConfig;
1749 use camel_api::splitter::{SplitterConfig, split_body_lines};
1750 use camel_core::route::BuilderStep;
1751
1752 let definition = RouteBuilder::from("timer:tick")
1753 .route_id("test-route")
1754 .split(SplitterConfig::new(split_body_lines()))
1755 .aggregate(
1756 AggregatorConfig::correlate_by("key")
1757 .complete_when_size(1)
1758 .build(),
1759 )
1760 .end_split()
1761 .build()
1762 .unwrap();
1763
1764 assert_eq!(definition.steps().len(), 1);
1765 if let BuilderStep::Split { steps, .. } = &definition.steps()[0] {
1766 assert!(matches!(steps[0], BuilderStep::Aggregate { .. }));
1767 } else {
1768 panic!("expected Split step");
1769 }
1770 }
1771
1772 #[test]
1775 fn test_builder_set_body_static_adds_processor() {
1776 let definition = RouteBuilder::from("timer:tick")
1777 .route_id("test-route")
1778 .set_body("fixed")
1779 .build()
1780 .unwrap();
1781 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1782 }
1783
1784 #[test]
1785 fn test_builder_set_body_fn_adds_processor() {
1786 let definition = RouteBuilder::from("timer:tick")
1787 .route_id("test-route")
1788 .set_body_fn(|_ex: &Exchange| Body::Text("dynamic".into()))
1789 .build()
1790 .unwrap();
1791 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1792 }
1793
1794 #[test]
1795 fn transform_alias_produces_same_as_set_body() {
1796 let route_transform = RouteBuilder::from("timer:tick")
1797 .route_id("test-route")
1798 .transform("hello")
1799 .build()
1800 .unwrap();
1801
1802 let route_set_body = RouteBuilder::from("timer:tick")
1803 .route_id("test-route")
1804 .set_body("hello")
1805 .build()
1806 .unwrap();
1807
1808 assert_eq!(route_transform.steps().len(), route_set_body.steps().len());
1809 }
1810
1811 #[test]
1812 fn test_builder_set_header_fn_adds_processor() {
1813 let definition = RouteBuilder::from("timer:tick")
1814 .route_id("test-route")
1815 .set_header_fn("k", |_ex: &Exchange| Value::String("v".into()))
1816 .build()
1817 .unwrap();
1818 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
1819 }
1820
1821 #[tokio::test]
1822 async fn test_set_body_static_processor_works() {
1823 use camel_core::route::compose_pipeline;
1824 let def = RouteBuilder::from("t:t")
1825 .route_id("test-route")
1826 .set_body("replaced")
1827 .build()
1828 .unwrap();
1829 let pipeline = compose_pipeline(
1830 def.steps()
1831 .iter()
1832 .filter_map(|s| {
1833 if let BuilderStep::Processor(p) = s {
1834 Some(p.clone())
1835 } else {
1836 None
1837 }
1838 })
1839 .collect(),
1840 );
1841 let exchange = Exchange::new(Message::new("original"));
1842 let result = pipeline.oneshot(exchange).await.unwrap();
1843 assert_eq!(result.input.body.as_text(), Some("replaced"));
1844 }
1845
1846 #[tokio::test]
1847 async fn test_set_body_fn_processor_works() {
1848 use camel_core::route::compose_pipeline;
1849 let def = RouteBuilder::from("t:t")
1850 .route_id("test-route")
1851 .set_body_fn(|ex: &Exchange| {
1852 Body::Text(ex.input.body.as_text().unwrap_or("").to_uppercase())
1853 })
1854 .build()
1855 .unwrap();
1856 let pipeline = compose_pipeline(
1857 def.steps()
1858 .iter()
1859 .filter_map(|s| {
1860 if let BuilderStep::Processor(p) = s {
1861 Some(p.clone())
1862 } else {
1863 None
1864 }
1865 })
1866 .collect(),
1867 );
1868 let exchange = Exchange::new(Message::new("hello"));
1869 let result = pipeline.oneshot(exchange).await.unwrap();
1870 assert_eq!(result.input.body.as_text(), Some("HELLO"));
1871 }
1872
1873 #[tokio::test]
1874 async fn test_set_header_fn_processor_works() {
1875 use camel_core::route::compose_pipeline;
1876 let def = RouteBuilder::from("t:t")
1877 .route_id("test-route")
1878 .set_header_fn("echo", |ex: &Exchange| {
1879 ex.input
1880 .body
1881 .as_text()
1882 .map(|t| Value::String(t.into()))
1883 .unwrap_or(Value::Null)
1884 })
1885 .build()
1886 .unwrap();
1887 let pipeline = compose_pipeline(
1888 def.steps()
1889 .iter()
1890 .filter_map(|s| {
1891 if let BuilderStep::Processor(p) = s {
1892 Some(p.clone())
1893 } else {
1894 None
1895 }
1896 })
1897 .collect(),
1898 );
1899 let exchange = Exchange::new(Message::new("ping"));
1900 let result = pipeline.oneshot(exchange).await.unwrap();
1901 assert_eq!(
1902 result.input.header("echo"),
1903 Some(&Value::String("ping".into()))
1904 );
1905 }
1906
1907 #[test]
1910 fn test_filter_builder_typestate() {
1911 let result = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1912 .route_id("test-route")
1913 .filter(|_ex| true)
1914 .to("mock:inner")
1915 .end_filter()
1916 .to("mock:outer")
1917 .build();
1918 assert!(result.is_ok());
1919 }
1920
1921 #[test]
1922 fn test_filter_builder_steps_collected() {
1923 let definition = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
1924 .route_id("test-route")
1925 .filter(|_ex| true)
1926 .to("mock:inner")
1927 .end_filter()
1928 .build()
1929 .unwrap();
1930
1931 assert_eq!(definition.steps().len(), 1);
1932 assert!(matches!(&definition.steps()[0], BuilderStep::Filter { .. }));
1933 }
1934
1935 #[test]
1936 fn test_wire_tap_builder_adds_step() {
1937 let definition = RouteBuilder::from("timer:tick")
1938 .route_id("test-route")
1939 .wire_tap("mock:tap")
1940 .to("mock:result")
1941 .build()
1942 .unwrap();
1943
1944 assert_eq!(definition.steps().len(), 2);
1945 assert!(
1946 matches!(&definition.steps()[0], BuilderStep::WireTap { uri } if uri == "mock:tap")
1947 );
1948 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:result"));
1949 }
1950
1951 #[test]
1954 fn test_multicast_builder_typestate() {
1955 let definition = RouteBuilder::from("timer:tick")
1956 .route_id("test-route")
1957 .multicast()
1958 .to("direct:a")
1959 .to("direct:b")
1960 .end_multicast()
1961 .to("mock:result")
1962 .build()
1963 .unwrap();
1964
1965 assert_eq!(definition.steps().len(), 2); }
1967
1968 #[test]
1969 fn test_multicast_builder_steps_collected() {
1970 let definition = RouteBuilder::from("timer:tick")
1971 .route_id("test-route")
1972 .multicast()
1973 .to("direct:a")
1974 .to("direct:b")
1975 .end_multicast()
1976 .build()
1977 .unwrap();
1978
1979 match &definition.steps()[0] {
1980 BuilderStep::Multicast { steps, .. } => {
1981 assert_eq!(steps.len(), 2);
1982 }
1983 other => panic!("Expected Multicast, got {:?}", other),
1984 }
1985 }
1986
1987 #[test]
1990 fn test_builder_concurrent_sets_concurrency() {
1991 use camel_component::ConcurrencyModel;
1992
1993 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
1994 .route_id("test-route")
1995 .concurrent(16)
1996 .to("log:info")
1997 .build()
1998 .unwrap();
1999
2000 assert_eq!(
2001 definition.concurrency_override(),
2002 Some(&ConcurrencyModel::Concurrent { max: Some(16) })
2003 );
2004 }
2005
2006 #[test]
2007 fn test_builder_concurrent_zero_means_unbounded() {
2008 use camel_component::ConcurrencyModel;
2009
2010 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2011 .route_id("test-route")
2012 .concurrent(0)
2013 .to("log:info")
2014 .build()
2015 .unwrap();
2016
2017 assert_eq!(
2018 definition.concurrency_override(),
2019 Some(&ConcurrencyModel::Concurrent { max: None })
2020 );
2021 }
2022
2023 #[test]
2024 fn test_builder_sequential_sets_concurrency() {
2025 use camel_component::ConcurrencyModel;
2026
2027 let definition = RouteBuilder::from("http://0.0.0.0:8080/test")
2028 .route_id("test-route")
2029 .sequential()
2030 .to("log:info")
2031 .build()
2032 .unwrap();
2033
2034 assert_eq!(
2035 definition.concurrency_override(),
2036 Some(&ConcurrencyModel::Sequential)
2037 );
2038 }
2039
2040 #[test]
2041 fn test_builder_default_concurrency_is_none() {
2042 let definition = RouteBuilder::from("timer:tick")
2043 .route_id("test-route")
2044 .to("log:info")
2045 .build()
2046 .unwrap();
2047
2048 assert_eq!(definition.concurrency_override(), None);
2049 }
2050
2051 #[test]
2054 fn test_builder_route_id_sets_id() {
2055 let definition = RouteBuilder::from("timer:tick")
2056 .route_id("my-route")
2057 .build()
2058 .unwrap();
2059
2060 assert_eq!(definition.route_id(), "my-route");
2061 }
2062
2063 #[test]
2064 fn test_build_without_route_id_fails() {
2065 let result = RouteBuilder::from("timer:tick?period=1000")
2066 .to("log:info")
2067 .build();
2068 let err = match result {
2069 Err(e) => e.to_string(),
2070 Ok(_) => panic!("build() should fail without route_id"),
2071 };
2072 assert!(
2073 err.contains("route_id"),
2074 "error should mention route_id, got: {}",
2075 err
2076 );
2077 }
2078
2079 #[test]
2080 fn test_builder_auto_startup_false() {
2081 let definition = RouteBuilder::from("timer:tick")
2082 .route_id("test-route")
2083 .auto_startup(false)
2084 .build()
2085 .unwrap();
2086
2087 assert!(!definition.auto_startup());
2088 }
2089
2090 #[test]
2091 fn test_builder_startup_order_custom() {
2092 let definition = RouteBuilder::from("timer:tick")
2093 .route_id("test-route")
2094 .startup_order(50)
2095 .build()
2096 .unwrap();
2097
2098 assert_eq!(definition.startup_order(), 50);
2099 }
2100
2101 #[test]
2102 fn test_builder_defaults() {
2103 let definition = RouteBuilder::from("timer:tick")
2104 .route_id("test-route")
2105 .build()
2106 .unwrap();
2107
2108 assert_eq!(definition.route_id(), "test-route");
2109 assert!(definition.auto_startup());
2110 assert_eq!(definition.startup_order(), 1000);
2111 }
2112
2113 #[test]
2116 fn test_choice_builder_single_when() {
2117 let definition = RouteBuilder::from("timer:tick")
2118 .route_id("test-route")
2119 .choice()
2120 .when(|ex: &Exchange| ex.input.header("type").is_some())
2121 .to("mock:typed")
2122 .end_when()
2123 .end_choice()
2124 .build()
2125 .unwrap();
2126 assert_eq!(definition.steps().len(), 1);
2127 assert!(
2128 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2129 if whens.len() == 1 && otherwise.is_none())
2130 );
2131 }
2132
2133 #[test]
2134 fn test_choice_builder_when_otherwise() {
2135 let definition = RouteBuilder::from("timer:tick")
2136 .route_id("test-route")
2137 .choice()
2138 .when(|ex: &Exchange| ex.input.header("a").is_some())
2139 .to("mock:a")
2140 .end_when()
2141 .otherwise()
2142 .to("mock:fallback")
2143 .end_otherwise()
2144 .end_choice()
2145 .build()
2146 .unwrap();
2147 assert!(
2148 matches!(&definition.steps()[0], BuilderStep::Choice { whens, otherwise }
2149 if whens.len() == 1 && otherwise.is_some())
2150 );
2151 }
2152
2153 #[test]
2154 fn test_choice_builder_multiple_whens() {
2155 let definition = RouteBuilder::from("timer:tick")
2156 .route_id("test-route")
2157 .choice()
2158 .when(|ex: &Exchange| ex.input.header("a").is_some())
2159 .to("mock:a")
2160 .end_when()
2161 .when(|ex: &Exchange| ex.input.header("b").is_some())
2162 .to("mock:b")
2163 .end_when()
2164 .end_choice()
2165 .build()
2166 .unwrap();
2167 assert!(
2168 matches!(&definition.steps()[0], BuilderStep::Choice { whens, .. }
2169 if whens.len() == 2)
2170 );
2171 }
2172
2173 #[test]
2174 fn test_choice_step_after_choice() {
2175 let definition = RouteBuilder::from("timer:tick")
2177 .route_id("test-route")
2178 .choice()
2179 .when(|_ex: &Exchange| true)
2180 .to("mock:inner")
2181 .end_when()
2182 .end_choice()
2183 .to("mock:outer") .build()
2185 .unwrap();
2186 assert_eq!(definition.steps().len(), 2);
2187 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2188 }
2189
2190 #[test]
2193 fn test_throttle_builder_typestate() {
2194 let definition = RouteBuilder::from("timer:tick")
2195 .route_id("test-route")
2196 .throttle(10, std::time::Duration::from_secs(1))
2197 .to("mock:result")
2198 .end_throttle()
2199 .build()
2200 .unwrap();
2201
2202 assert_eq!(definition.steps().len(), 1);
2203 assert!(matches!(
2204 &definition.steps()[0],
2205 BuilderStep::Throttle { .. }
2206 ));
2207 }
2208
2209 #[test]
2210 fn test_throttle_builder_with_strategy() {
2211 let definition = RouteBuilder::from("timer:tick")
2212 .route_id("test-route")
2213 .throttle(10, std::time::Duration::from_secs(1))
2214 .strategy(ThrottleStrategy::Reject)
2215 .to("mock:result")
2216 .end_throttle()
2217 .build()
2218 .unwrap();
2219
2220 if let BuilderStep::Throttle { config, .. } = &definition.steps()[0] {
2221 assert_eq!(config.strategy, ThrottleStrategy::Reject);
2222 } else {
2223 panic!("Expected Throttle step");
2224 }
2225 }
2226
2227 #[test]
2228 fn test_throttle_builder_steps_collected() {
2229 let definition = RouteBuilder::from("timer:tick")
2230 .route_id("test-route")
2231 .throttle(5, std::time::Duration::from_secs(1))
2232 .set_header("throttled", Value::Bool(true))
2233 .to("mock:throttled")
2234 .end_throttle()
2235 .build()
2236 .unwrap();
2237
2238 match &definition.steps()[0] {
2239 BuilderStep::Throttle { steps, .. } => {
2240 assert_eq!(steps.len(), 2); }
2242 other => panic!("Expected Throttle, got {:?}", other),
2243 }
2244 }
2245
2246 #[test]
2247 fn test_throttle_step_after_throttle() {
2248 let definition = RouteBuilder::from("timer:tick")
2250 .route_id("test-route")
2251 .throttle(10, std::time::Duration::from_secs(1))
2252 .to("mock:inner")
2253 .end_throttle()
2254 .to("mock:outer")
2255 .build()
2256 .unwrap();
2257
2258 assert_eq!(definition.steps().len(), 2);
2259 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2260 }
2261
2262 #[test]
2265 fn test_load_balance_builder_typestate() {
2266 let definition = RouteBuilder::from("timer:tick")
2267 .route_id("test-route")
2268 .load_balance()
2269 .round_robin()
2270 .to("mock:a")
2271 .to("mock:b")
2272 .end_load_balance()
2273 .build()
2274 .unwrap();
2275
2276 assert_eq!(definition.steps().len(), 1);
2277 assert!(matches!(
2278 &definition.steps()[0],
2279 BuilderStep::LoadBalance { .. }
2280 ));
2281 }
2282
2283 #[test]
2284 fn test_load_balance_builder_with_strategy() {
2285 let definition = RouteBuilder::from("timer:tick")
2286 .route_id("test-route")
2287 .load_balance()
2288 .random()
2289 .to("mock:result")
2290 .end_load_balance()
2291 .build()
2292 .unwrap();
2293
2294 if let BuilderStep::LoadBalance { config, .. } = &definition.steps()[0] {
2295 assert_eq!(config.strategy, LoadBalanceStrategy::Random);
2296 } else {
2297 panic!("Expected LoadBalance step");
2298 }
2299 }
2300
2301 #[test]
2302 fn test_load_balance_builder_steps_collected() {
2303 let definition = RouteBuilder::from("timer:tick")
2304 .route_id("test-route")
2305 .load_balance()
2306 .set_header("lb", Value::Bool(true))
2307 .to("mock:a")
2308 .end_load_balance()
2309 .build()
2310 .unwrap();
2311
2312 match &definition.steps()[0] {
2313 BuilderStep::LoadBalance { steps, .. } => {
2314 assert_eq!(steps.len(), 2); }
2316 other => panic!("Expected LoadBalance, got {:?}", other),
2317 }
2318 }
2319
2320 #[test]
2321 fn test_load_balance_step_after_load_balance() {
2322 let definition = RouteBuilder::from("timer:tick")
2324 .route_id("test-route")
2325 .load_balance()
2326 .to("mock:inner")
2327 .end_load_balance()
2328 .to("mock:outer")
2329 .build()
2330 .unwrap();
2331
2332 assert_eq!(definition.steps().len(), 2);
2333 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2334 }
2335
2336 #[test]
2339 fn test_dynamic_router_builder() {
2340 let definition = RouteBuilder::from("timer:tick")
2341 .route_id("test-route")
2342 .dynamic_router(Arc::new(|_| Some("mock:result".to_string())))
2343 .build()
2344 .unwrap();
2345
2346 assert_eq!(definition.steps().len(), 1);
2347 assert!(matches!(
2348 &definition.steps()[0],
2349 BuilderStep::DynamicRouter { .. }
2350 ));
2351 }
2352
2353 #[test]
2354 fn test_dynamic_router_builder_with_config() {
2355 let config = DynamicRouterConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2356 .max_iterations(100)
2357 .cache_size(500);
2358
2359 let definition = RouteBuilder::from("timer:tick")
2360 .route_id("test-route")
2361 .dynamic_router_with_config(config)
2362 .build()
2363 .unwrap();
2364
2365 assert_eq!(definition.steps().len(), 1);
2366 if let BuilderStep::DynamicRouter { config } = &definition.steps()[0] {
2367 assert_eq!(config.max_iterations, 100);
2368 assert_eq!(config.cache_size, 500);
2369 } else {
2370 panic!("Expected DynamicRouter step");
2371 }
2372 }
2373
2374 #[test]
2375 fn test_dynamic_router_step_after_router() {
2376 let definition = RouteBuilder::from("timer:tick")
2378 .route_id("test-route")
2379 .dynamic_router(Arc::new(|_| Some("mock:inner".to_string())))
2380 .to("mock:outer")
2381 .build()
2382 .unwrap();
2383
2384 assert_eq!(definition.steps().len(), 2);
2385 assert!(matches!(
2386 &definition.steps()[0],
2387 BuilderStep::DynamicRouter { .. }
2388 ));
2389 assert!(matches!(&definition.steps()[1], BuilderStep::To(uri) if uri == "mock:outer"));
2390 }
2391
2392 #[test]
2393 fn routing_slip_builder_creates_step() {
2394 use camel_api::RoutingSlipExpression;
2395
2396 let expression: RoutingSlipExpression = Arc::new(|_| Some("direct:a,direct:b".to_string()));
2397
2398 let route = RouteBuilder::from("direct:start")
2399 .route_id("routing-slip-test")
2400 .routing_slip(expression)
2401 .build()
2402 .unwrap();
2403
2404 assert!(
2405 matches!(route.steps()[0], BuilderStep::RoutingSlip { .. }),
2406 "Expected RoutingSlip step"
2407 );
2408 }
2409
2410 #[test]
2411 fn routing_slip_with_config_builder_creates_step() {
2412 use camel_api::RoutingSlipConfig;
2413
2414 let config = RoutingSlipConfig::new(Arc::new(|_| Some("mock:a".to_string())))
2415 .uri_delimiter("|")
2416 .cache_size(50)
2417 .ignore_invalid_endpoints(true);
2418
2419 let route = RouteBuilder::from("direct:start")
2420 .route_id("routing-slip-config-test")
2421 .routing_slip_with_config(config)
2422 .build()
2423 .unwrap();
2424
2425 if let BuilderStep::RoutingSlip { config } = &route.steps()[0] {
2426 assert_eq!(config.uri_delimiter, "|");
2427 assert_eq!(config.cache_size, 50);
2428 assert!(config.ignore_invalid_endpoints);
2429 } else {
2430 panic!("Expected RoutingSlip step");
2431 }
2432 }
2433
2434 #[test]
2435 fn test_builder_marshal_adds_processor_step() {
2436 let definition = RouteBuilder::from("timer:tick")
2437 .route_id("test-route")
2438 .marshal("json")
2439 .build()
2440 .unwrap();
2441 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2442 }
2443
2444 #[test]
2445 fn test_builder_unmarshal_adds_processor_step() {
2446 let definition = RouteBuilder::from("timer:tick")
2447 .route_id("test-route")
2448 .unmarshal("json")
2449 .build()
2450 .unwrap();
2451 assert!(matches!(&definition.steps()[0], BuilderStep::Processor(_)));
2452 }
2453
2454 #[test]
2455 #[should_panic(expected = "unknown data format: 'csv'")]
2456 fn test_builder_marshal_panics_on_unknown_format() {
2457 let _ = RouteBuilder::from("timer:tick")
2458 .route_id("test-route")
2459 .marshal("csv")
2460 .build();
2461 }
2462
2463 #[test]
2464 #[should_panic(expected = "unknown data format: 'csv'")]
2465 fn test_builder_unmarshal_panics_on_unknown_format() {
2466 let _ = RouteBuilder::from("timer:tick")
2467 .route_id("test-route")
2468 .unmarshal("csv")
2469 .build();
2470 }
2471}