1use camel_api::UnitOfWorkConfig;
5use camel_api::circuit_breaker::CircuitBreakerConfig;
6use camel_api::error_handler::ErrorHandlerConfig;
7use camel_api::loop_eip::LoopConfig;
8use camel_api::{AggregatorConfig, BoxProcessor, FilterPredicate, MulticastConfig, SplitterConfig};
9use camel_component_api::ConcurrencyModel;
10
11pub struct WhenStep {
13 pub predicate: FilterPredicate,
14 pub steps: Vec<BuilderStep>,
15}
16
17pub use camel_api::declarative::{LanguageExpressionDef, ValueSourceDef};
18
19#[derive(Debug)]
21pub struct DeclarativeWhenStep {
22 pub predicate: LanguageExpressionDef,
23 pub steps: Vec<BuilderStep>,
24}
25
26pub enum BuilderStep {
28 Processor(BoxProcessor),
30 To(String),
32 Stop,
34 Log {
36 level: camel_processor::LogLevel,
37 message: String,
38 },
39 DeclarativeSetHeader {
41 key: String,
42 value: ValueSourceDef,
43 },
44 DeclarativeSetBody {
46 value: ValueSourceDef,
47 },
48 DeclarativeFilter {
50 predicate: LanguageExpressionDef,
51 steps: Vec<BuilderStep>,
52 },
53 DeclarativeChoice {
55 whens: Vec<DeclarativeWhenStep>,
56 otherwise: Option<Vec<BuilderStep>>,
57 },
58 DeclarativeScript {
60 expression: LanguageExpressionDef,
61 },
62 DeclarativeFunction {
63 definition: camel_api::FunctionDefinition,
64 },
65 DeclarativeSplit {
67 expression: LanguageExpressionDef,
68 aggregation: camel_api::splitter::AggregationStrategy,
69 parallel: bool,
70 parallel_limit: Option<usize>,
71 stop_on_exception: bool,
72 steps: Vec<BuilderStep>,
73 },
74 DeclarativeDynamicRouter {
75 expression: LanguageExpressionDef,
76 uri_delimiter: String,
77 cache_size: i32,
78 ignore_invalid_endpoints: bool,
79 max_iterations: usize,
80 },
81 DeclarativeRoutingSlip {
82 expression: LanguageExpressionDef,
83 uri_delimiter: String,
84 cache_size: i32,
85 ignore_invalid_endpoints: bool,
86 },
87 Split {
89 config: SplitterConfig,
90 steps: Vec<BuilderStep>,
91 },
92 Aggregate {
94 config: AggregatorConfig,
95 },
96 Filter {
98 predicate: FilterPredicate,
99 steps: Vec<BuilderStep>,
100 },
101 Choice {
104 whens: Vec<WhenStep>,
105 otherwise: Option<Vec<BuilderStep>>,
106 },
107 WireTap {
109 uri: String,
110 },
111 Multicast {
113 steps: Vec<BuilderStep>,
114 config: MulticastConfig,
115 },
116 DeclarativeLog {
118 level: camel_processor::LogLevel,
119 message: ValueSourceDef,
120 },
121 Bean {
123 name: String,
124 method: String,
125 },
126 Script {
129 language: String,
130 script: String,
131 },
132 Throttle {
134 config: camel_api::ThrottlerConfig,
135 steps: Vec<BuilderStep>,
136 },
137 LoadBalance {
139 config: camel_api::LoadBalancerConfig,
140 steps: Vec<BuilderStep>,
141 },
142 DynamicRouter {
144 config: camel_api::DynamicRouterConfig,
145 },
146 RoutingSlip {
147 config: camel_api::RoutingSlipConfig,
148 },
149 RecipientList {
150 config: camel_api::recipient_list::RecipientListConfig,
151 },
152 DeclarativeRecipientList {
153 expression: LanguageExpressionDef,
154 delimiter: String,
155 parallel: bool,
156 parallel_limit: Option<usize>,
157 stop_on_exception: bool,
158 aggregation: String,
159 },
160 Delay {
161 config: camel_api::DelayConfig,
162 },
163 Loop {
165 config: LoopConfig,
166 steps: Vec<BuilderStep>,
167 },
168 DeclarativeLoop {
170 count: Option<usize>,
171 while_predicate: Option<LanguageExpressionDef>,
172 steps: Vec<BuilderStep>,
173 },
174}
175
176impl std::fmt::Debug for BuilderStep {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 match self {
179 BuilderStep::Processor(_) => write!(f, "BuilderStep::Processor(...)"),
180 BuilderStep::To(uri) => write!(f, "BuilderStep::To({uri:?})"),
181 BuilderStep::Stop => write!(f, "BuilderStep::Stop"),
182 BuilderStep::Log { level, message } => write!(
183 f,
184 "BuilderStep::Log {{ level: {level:?}, message: {message:?} }}"
185 ),
186 BuilderStep::DeclarativeSetHeader { key, .. } => {
187 write!(
188 f,
189 "BuilderStep::DeclarativeSetHeader {{ key: {key:?}, .. }}"
190 )
191 }
192 BuilderStep::DeclarativeSetBody { .. } => {
193 write!(f, "BuilderStep::DeclarativeSetBody {{ .. }}")
194 }
195 BuilderStep::DeclarativeFilter { steps, .. } => {
196 write!(
197 f,
198 "BuilderStep::DeclarativeFilter {{ steps: {steps:?}, .. }}"
199 )
200 }
201 BuilderStep::DeclarativeChoice { whens, otherwise } => {
202 write!(
203 f,
204 "BuilderStep::DeclarativeChoice {{ whens: {} clause(s), otherwise: {} }}",
205 whens.len(),
206 if otherwise.is_some() { "Some" } else { "None" }
207 )
208 }
209 BuilderStep::DeclarativeScript { expression } => write!(
210 f,
211 "BuilderStep::DeclarativeScript {{ language: {:?}, .. }}",
212 expression.language
213 ),
214 BuilderStep::DeclarativeFunction { definition } => write!(
215 f,
216 "BuilderStep::DeclarativeFunction {{ id: {:?}, runtime: {:?}, .. }}",
217 definition.id, definition.runtime
218 ),
219 BuilderStep::DeclarativeSplit { steps, .. } => {
220 write!(
221 f,
222 "BuilderStep::DeclarativeSplit {{ steps: {steps:?}, .. }}"
223 )
224 }
225 BuilderStep::DeclarativeDynamicRouter { expression, .. } => write!(
226 f,
227 "BuilderStep::DeclarativeDynamicRouter {{ language: {:?}, .. }}",
228 expression.language
229 ),
230 BuilderStep::DeclarativeRoutingSlip { expression, .. } => write!(
231 f,
232 "BuilderStep::DeclarativeRoutingSlip {{ language: {:?}, .. }}",
233 expression.language
234 ),
235 BuilderStep::Split { steps, .. } => {
236 write!(f, "BuilderStep::Split {{ steps: {steps:?}, .. }}")
237 }
238 BuilderStep::Aggregate { .. } => write!(f, "BuilderStep::Aggregate {{ .. }}"),
239 BuilderStep::Filter { steps, .. } => {
240 write!(f, "BuilderStep::Filter {{ steps: {steps:?}, .. }}")
241 }
242 BuilderStep::Choice { whens, otherwise } => {
243 write!(
244 f,
245 "BuilderStep::Choice {{ whens: {} clause(s), otherwise: {} }}",
246 whens.len(),
247 if otherwise.is_some() { "Some" } else { "None" }
248 )
249 }
250 BuilderStep::WireTap { uri } => write!(f, "BuilderStep::WireTap {{ uri: {uri:?} }}"),
251 BuilderStep::Multicast { steps, .. } => {
252 write!(f, "BuilderStep::Multicast {{ steps: {steps:?}, .. }}")
253 }
254 BuilderStep::DeclarativeLog { level, .. } => {
255 write!(f, "BuilderStep::DeclarativeLog {{ level: {level:?}, .. }}")
256 }
257 BuilderStep::Bean { name, method } => {
258 write!(
259 f,
260 "BuilderStep::Bean {{ name: {name:?}, method: {method:?} }}"
261 )
262 }
263 BuilderStep::Script { language, .. } => {
264 write!(f, "BuilderStep::Script {{ language: {language:?}, .. }}")
265 }
266 BuilderStep::Throttle { steps, .. } => {
267 write!(f, "BuilderStep::Throttle {{ steps: {steps:?}, .. }}")
268 }
269 BuilderStep::LoadBalance { steps, .. } => {
270 write!(f, "BuilderStep::LoadBalance {{ steps: {steps:?}, .. }}")
271 }
272 BuilderStep::DynamicRouter { .. } => {
273 write!(f, "BuilderStep::DynamicRouter {{ .. }}")
274 }
275 BuilderStep::RoutingSlip { .. } => {
276 write!(f, "BuilderStep::RoutingSlip {{ .. }}")
277 }
278 BuilderStep::RecipientList { .. } => {
279 write!(f, "BuilderStep::RecipientList {{ .. }}")
280 }
281 BuilderStep::DeclarativeRecipientList {
282 expression,
283 aggregation,
284 ..
285 } => write!(
286 f,
287 "BuilderStep::DeclarativeRecipientList {{ language: {:?}, aggregation: {:?}, .. }}",
288 expression.language, aggregation
289 ),
290 BuilderStep::Delay { config } => {
291 write!(f, "BuilderStep::Delay {{ config: {:?} }}", config)
292 }
293 BuilderStep::Loop { config, steps } => {
294 write!(
295 f,
296 "BuilderStep::Loop {{ config: {:?}, steps: {} }}",
297 config.mode_name(),
298 steps.len()
299 )
300 }
301 BuilderStep::DeclarativeLoop {
302 count,
303 while_predicate,
304 steps,
305 } => {
306 write!(
307 f,
308 "BuilderStep::DeclarativeLoop {{ count: {:?}, while: {}, steps: {} }}",
309 count,
310 while_predicate.is_some(),
311 steps.len()
312 )
313 }
314 }
315 }
316}
317
318pub struct RouteDefinition {
320 pub(crate) from_uri: String,
321 pub(crate) steps: Vec<BuilderStep>,
322 pub(crate) error_handler: Option<ErrorHandlerConfig>,
324 pub(crate) circuit_breaker: Option<CircuitBreakerConfig>,
326 pub(crate) unit_of_work: Option<UnitOfWorkConfig>,
328 pub(crate) concurrency: Option<ConcurrencyModel>,
331 pub(crate) route_id: String,
333 pub(crate) auto_startup: bool,
335 pub(crate) startup_order: i32,
337 pub(crate) source_hash: Option<u64>,
338}
339
340impl RouteDefinition {
341 pub fn new(from_uri: impl Into<String>, steps: Vec<BuilderStep>) -> Self {
343 Self {
344 from_uri: from_uri.into(),
345 steps,
346 error_handler: None,
347 circuit_breaker: None,
348 unit_of_work: None,
349 concurrency: None,
350 route_id: String::new(), auto_startup: true,
352 startup_order: 1000,
353 source_hash: None,
354 }
355 }
356
357 pub fn from_uri(&self) -> &str {
359 &self.from_uri
360 }
361
362 pub fn steps(&self) -> &[BuilderStep] {
364 &self.steps
365 }
366
367 pub fn with_error_handler(mut self, config: ErrorHandlerConfig) -> Self {
369 self.error_handler = Some(config);
370 self
371 }
372
373 pub fn error_handler_config(&self) -> Option<&ErrorHandlerConfig> {
375 self.error_handler.as_ref()
376 }
377
378 pub fn with_circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
380 self.circuit_breaker = Some(config);
381 self
382 }
383
384 pub fn with_unit_of_work(mut self, config: UnitOfWorkConfig) -> Self {
386 self.unit_of_work = Some(config);
387 self
388 }
389
390 pub fn unit_of_work_config(&self) -> Option<&UnitOfWorkConfig> {
392 self.unit_of_work.as_ref()
393 }
394
395 pub fn circuit_breaker_config(&self) -> Option<&CircuitBreakerConfig> {
397 self.circuit_breaker.as_ref()
398 }
399
400 pub fn concurrency_override(&self) -> Option<&ConcurrencyModel> {
402 self.concurrency.as_ref()
403 }
404
405 pub fn with_concurrency(mut self, model: ConcurrencyModel) -> Self {
407 self.concurrency = Some(model);
408 self
409 }
410
411 pub fn route_id(&self) -> &str {
413 &self.route_id
414 }
415
416 pub fn auto_startup(&self) -> bool {
418 self.auto_startup
419 }
420
421 pub fn startup_order(&self) -> i32 {
423 self.startup_order
424 }
425
426 pub fn with_route_id(mut self, id: impl Into<String>) -> Self {
428 self.route_id = id.into();
429 self
430 }
431
432 pub fn with_auto_startup(mut self, auto: bool) -> Self {
434 self.auto_startup = auto;
435 self
436 }
437
438 pub fn with_startup_order(mut self, order: i32) -> Self {
440 self.startup_order = order;
441 self
442 }
443
444 pub fn with_source_hash(mut self, hash: u64) -> Self {
445 self.source_hash = Some(hash);
446 self
447 }
448
449 pub fn source_hash(&self) -> Option<u64> {
450 self.source_hash
451 }
452
453 pub fn to_info(&self) -> RouteDefinitionInfo {
456 RouteDefinitionInfo {
457 route_id: self.route_id.clone(),
458 auto_startup: self.auto_startup,
459 startup_order: self.startup_order,
460 source_hash: self.source_hash,
461 }
462 }
463}
464
465#[derive(Clone)]
471pub struct RouteDefinitionInfo {
472 route_id: String,
473 auto_startup: bool,
474 startup_order: i32,
475 pub(crate) source_hash: Option<u64>,
476}
477
478impl RouteDefinitionInfo {
479 pub fn route_id(&self) -> &str {
481 &self.route_id
482 }
483
484 pub fn auto_startup(&self) -> bool {
486 self.auto_startup
487 }
488
489 pub fn startup_order(&self) -> i32 {
491 self.startup_order
492 }
493
494 pub fn source_hash(&self) -> Option<u64> {
495 self.source_hash
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502
503 #[test]
504 fn test_builder_step_multicast_variant() {
505 use camel_api::MulticastConfig;
506
507 let step = BuilderStep::Multicast {
508 steps: vec![BuilderStep::To("direct:a".into())],
509 config: MulticastConfig::new(),
510 };
511
512 assert!(matches!(step, BuilderStep::Multicast { .. }));
513 }
514
515 #[test]
516 fn test_route_definition_defaults() {
517 let def = RouteDefinition::new("direct:test", vec![]).with_route_id("test-route");
518 assert_eq!(def.route_id(), "test-route");
519 assert!(def.auto_startup());
520 assert_eq!(def.startup_order(), 1000);
521 }
522
523 #[test]
524 fn test_route_definition_builders() {
525 let def = RouteDefinition::new("direct:test", vec![])
526 .with_route_id("my-route")
527 .with_auto_startup(false)
528 .with_startup_order(50);
529 assert_eq!(def.route_id(), "my-route");
530 assert!(!def.auto_startup());
531 assert_eq!(def.startup_order(), 50);
532 }
533
534 #[test]
535 fn test_route_definition_accessors_cover_core_fields() {
536 let def = RouteDefinition::new("direct:in", vec![BuilderStep::To("mock:out".into())])
537 .with_route_id("accessor-route");
538
539 assert_eq!(def.from_uri(), "direct:in");
540 assert_eq!(def.steps().len(), 1);
541 assert!(matches!(def.steps()[0], BuilderStep::To(_)));
542 }
543
544 #[test]
545 fn test_route_definition_error_handler_circuit_breaker_and_concurrency_accessors() {
546 use camel_api::circuit_breaker::CircuitBreakerConfig;
547 use camel_api::error_handler::ErrorHandlerConfig;
548 use camel_component_api::ConcurrencyModel;
549
550 let def = RouteDefinition::new("direct:test", vec![])
551 .with_route_id("eh-route")
552 .with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlc"))
553 .with_circuit_breaker(CircuitBreakerConfig::new())
554 .with_concurrency(ConcurrencyModel::Concurrent { max: Some(4) });
555
556 let eh = def
557 .error_handler_config()
558 .expect("error handler should be set");
559 assert_eq!(eh.dlc_uri.as_deref(), Some("log:dlc"));
560 assert!(def.circuit_breaker_config().is_some());
561 assert!(matches!(
562 def.concurrency_override(),
563 Some(ConcurrencyModel::Concurrent { max: Some(4) })
564 ));
565 }
566
567 #[test]
568 fn test_builder_step_debug_covers_many_variants() {
569 use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
570 use camel_api::{
571 DynamicRouterConfig, Exchange, IdentityProcessor, RoutingSlipConfig, Value,
572 };
573 use std::sync::Arc;
574
575 let expr = LanguageExpressionDef {
576 language: "simple".into(),
577 source: "${body}".into(),
578 };
579
580 let steps = vec![
581 BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
582 BuilderStep::To("mock:out".into()),
583 BuilderStep::Stop,
584 BuilderStep::Log {
585 level: camel_processor::LogLevel::Info,
586 message: "hello".into(),
587 },
588 BuilderStep::DeclarativeSetHeader {
589 key: "k".into(),
590 value: ValueSourceDef::Literal(Value::String("v".into())),
591 },
592 BuilderStep::DeclarativeSetBody {
593 value: ValueSourceDef::Expression(expr.clone()),
594 },
595 BuilderStep::DeclarativeFilter {
596 predicate: expr.clone(),
597 steps: vec![BuilderStep::Stop],
598 },
599 BuilderStep::DeclarativeChoice {
600 whens: vec![DeclarativeWhenStep {
601 predicate: expr.clone(),
602 steps: vec![BuilderStep::Stop],
603 }],
604 otherwise: Some(vec![BuilderStep::Stop]),
605 },
606 BuilderStep::DeclarativeScript {
607 expression: expr.clone(),
608 },
609 BuilderStep::DeclarativeSplit {
610 expression: expr.clone(),
611 aggregation: AggregationStrategy::Original,
612 parallel: false,
613 parallel_limit: Some(2),
614 stop_on_exception: true,
615 steps: vec![BuilderStep::Stop],
616 },
617 BuilderStep::Split {
618 config: SplitterConfig::new(split_body_lines()),
619 steps: vec![BuilderStep::Stop],
620 },
621 BuilderStep::Aggregate {
622 config: camel_api::AggregatorConfig::correlate_by("id")
623 .complete_when_size(1)
624 .build(),
625 },
626 BuilderStep::Filter {
627 predicate: Arc::new(|_: &Exchange| true),
628 steps: vec![BuilderStep::Stop],
629 },
630 BuilderStep::WireTap {
631 uri: "mock:tap".into(),
632 },
633 BuilderStep::DeclarativeLog {
634 level: camel_processor::LogLevel::Info,
635 message: ValueSourceDef::Expression(expr.clone()),
636 },
637 BuilderStep::Bean {
638 name: "bean".into(),
639 method: "call".into(),
640 },
641 BuilderStep::Script {
642 language: "rhai".into(),
643 script: "body".into(),
644 },
645 BuilderStep::Throttle {
646 config: camel_api::ThrottlerConfig::new(10, std::time::Duration::from_millis(10)),
647 steps: vec![BuilderStep::Stop],
648 },
649 BuilderStep::LoadBalance {
650 config: camel_api::LoadBalancerConfig::round_robin(),
651 steps: vec![BuilderStep::To("mock:l1".into())],
652 },
653 BuilderStep::DynamicRouter {
654 config: DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
655 },
656 BuilderStep::RoutingSlip {
657 config: RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
658 },
659 ];
660
661 for step in steps {
662 let dbg = format!("{step:?}");
663 assert!(!dbg.is_empty());
664 }
665 }
666
667 #[test]
668 fn test_route_definition_to_info_preserves_metadata() {
669 let info = RouteDefinition::new("direct:test", vec![])
670 .with_route_id("meta-route")
671 .with_auto_startup(false)
672 .with_startup_order(7)
673 .to_info();
674
675 assert_eq!(info.route_id(), "meta-route");
676 assert!(!info.auto_startup());
677 assert_eq!(info.startup_order(), 7);
678 }
679
680 #[test]
681 fn test_choice_builder_step_debug() {
682 use camel_api::{Exchange, FilterPredicate};
683 use std::sync::Arc;
684
685 fn always_true(_: &Exchange) -> bool {
686 true
687 }
688
689 let step = BuilderStep::Choice {
690 whens: vec![WhenStep {
691 predicate: Arc::new(always_true) as FilterPredicate,
692 steps: vec![BuilderStep::To("mock:a".into())],
693 }],
694 otherwise: None,
695 };
696 let debug = format!("{step:?}");
697 assert!(debug.contains("Choice"));
698 }
699
700 #[test]
701 fn test_route_definition_unit_of_work() {
702 use camel_api::UnitOfWorkConfig;
703 let config = UnitOfWorkConfig {
704 on_complete: Some("log:complete".into()),
705 on_failure: Some("log:failed".into()),
706 };
707 let def = RouteDefinition::new("direct:test", vec![])
708 .with_route_id("uow-test")
709 .with_unit_of_work(config.clone());
710 assert_eq!(
711 def.unit_of_work_config().unwrap().on_complete.as_deref(),
712 Some("log:complete")
713 );
714 assert_eq!(
715 def.unit_of_work_config().unwrap().on_failure.as_deref(),
716 Some("log:failed")
717 );
718
719 let def_no_uow = RouteDefinition::new("direct:test", vec![]).with_route_id("no-uow");
720 assert!(def_no_uow.unit_of_work_config().is_none());
721 }
722}